diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 189 |
1 files changed, 174 insertions, 15 deletions
diff --git a/sql/log.cc b/sql/log.cc index 811921d8322..ac86441df32 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -57,6 +57,7 @@ #include "semisync_master.h" #include "sp_rcontext.h" #include "sp_head.h" +#include "sql_table.h" #include "wsrep_mysqld.h" #ifdef WITH_WSREP @@ -576,13 +577,137 @@ public: ulong binlog_id; /* Set if we get an error during commit that must be returned from unlog(). */ bool delayed_error; - + //Will be reset when gtid is written into binlog + uchar gtid_flags3; + decltype (rpl_gtid::seq_no) sa_seq_no; private: binlog_cache_mngr& operator=(const binlog_cache_mngr& info); binlog_cache_mngr(const binlog_cache_mngr& info); }; +/** + The function handles the first phase of two-phase binlogged ALTER. + On master binlogs START ALTER when that is configured to do so. + On slave START ALTER gets binlogged and its gtid committed into gtid slave pos + table. + + @param thd Thread handle. + @param start_alter_id Start Alter identifier or zero. + @param[out] + partial_alter Is set to true when Start Alter phase is completed. + @param if_exists True indicates the binary logging of the query + should be done with "if exists" option. + + @return false on success, true on failure + @return @c partial_alter set to @c true when START ALTER phase + has been completed +*/ +bool write_bin_log_start_alter(THD *thd, bool& partial_alter, + uint64 start_alter_id, bool if_exists) +{ +#if defined(HAVE_REPLICATION) + if (thd->variables.option_bits & OPTION_BIN_TMP_LOG_OFF) + return false; + + if (start_alter_id) + { + if (thd->rgi_slave->get_finish_event_group_called()) + return false; // can get here through retrying + + DBUG_EXECUTE_IF("at_write_start_alter", { + debug_sync_set_action(thd, + STRING_WITH_LEN("now wait_for alter_cont")); + }); + + Master_info *mi= thd->rgi_slave->rli->mi; + start_alter_info *info= thd->rgi_slave->sa_info; + bool is_shutdown= false; + + info->sa_seq_no= start_alter_id; + info->domain_id= thd->variables.gtid_domain_id; + mysql_mutex_lock(&mi->start_alter_list_lock); + // possible stop-slave's marking of the whole alter state list is checked + is_shutdown= mi->is_shutdown; + mi->start_alter_list.push_back(info, &mi->mem_root); + mysql_mutex_unlock(&mi->start_alter_list_lock); + info->state= start_alter_state::REGISTERED; + thd->rgi_slave->commit_orderer.wait_for_prior_commit(thd); + thd->rgi_slave->start_alter_ev->update_pos(thd->rgi_slave); + if (mysql_bin_log.is_open()) + { + Write_log_with_flags wlwf (thd, Gtid_log_event::FL_START_ALTER_E1); + if (write_bin_log(thd, true, thd->query(), thd->query_length())) + { + DBUG_ASSERT(thd->is_error()); + return true; + } + } + thd->rgi_slave->mark_start_commit(); + thd->wakeup_subsequent_commits(0); + thd->rgi_slave->finish_start_alter_event_group(); + + if (is_shutdown) + { + /* SA exists abruptly and will notify any CA|RA waiter. */ + mysql_mutex_lock(&mi->start_alter_lock); + /* + If there is (or will be) unlikely any CA it will execute + the whole query before to stop itself. + */ + info->direct_commit_alter= true; + info->state= start_alter_state::ROLLBACK_ALTER; + mysql_mutex_unlock(&mi->start_alter_lock); + + return true; + } + + return false; + } +#endif + +#ifndef WITH_WSREP + rpl_group_info *rgi= thd->rgi_slave ? thd->rgi_slave : thd->rgi_fake; +#else + rpl_group_info *rgi= thd->slave_thread ? thd->rgi_slave : + WSREP(thd) ? (thd->wsrep_rgi ? thd->wsrep_rgi : thd->rgi_fake) : + thd->rgi_fake; +#endif + + if (!rgi && thd->variables.binlog_alter_two_phase) + { + /* slave applier can handle here only regular ALTER */ + DBUG_ASSERT(!rgi || !(rgi->gtid_ev_flags_extra & + (Gtid_log_event::FL_START_ALTER_E1 | + Gtid_log_event::FL_COMMIT_ALTER_E1 | + Gtid_log_event::FL_ROLLBACK_ALTER_E1))); + + /* + After logging binlog state stays flagged with SA flags3 an seq_no. + The state is not reset after write_bin_log() is done which is + deferred for the second logging phase. + */ + thd->set_binlog_flags_for_alter(Gtid_log_event::FL_START_ALTER_E1); + if(write_bin_log_with_if_exists(thd, false, false, if_exists, false)) + { + DBUG_ASSERT(thd->is_error()); + + thd->set_binlog_flags_for_alter(0); + return true; + } + partial_alter= true; + } + else if (rgi && rgi->direct_commit_alter) + { + DBUG_ASSERT(rgi->gtid_ev_flags_extra & + Gtid_log_event::FL_COMMIT_ALTER_E1); + + partial_alter= true; + } + + return false; +} + bool LOGGER::is_log_table_enabled(uint log_table_type) { switch (log_table_type) { @@ -2665,12 +2790,11 @@ static void setup_windows_event_source() static int find_uniq_filename(char *name, ulong min_log_number_to_use, ulong *last_used_log_number) { - uint i; char buff[FN_REFLEN], ext_buf[FN_REFLEN]; struct st_my_dir *dir_info; struct fileinfo *file_info; ulong max_found= 0, next= 0, number= 0; - size_t buf_length, length; + size_t i, buf_length, length; char *start, *end; int error= 0; DBUG_ENTER("find_uniq_filename"); @@ -3001,7 +3125,7 @@ int MYSQL_BIN_LOG::generate_new_name(char *new_name, const char *log_name, fn_format(new_name, log_name, mysql_data_home, "", 4); if (!fn_ext(log_name)[0]) { - if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) || + if (DBUG_IF("binlog_inject_new_name_error") || unlikely(find_uniq_filename(new_name, next_log_number, &last_used_log_number))) { @@ -3572,7 +3696,7 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, mysql_file_seek(index_file_nr, 0L, MY_SEEK_END, MYF(0)), 0, MYF(MY_WME | MY_WAIT_IF_FULL), m_key_file_log_index_cache) || - DBUG_EVALUATE_IF("fault_injection_openning_index", 1, 0)) + DBUG_IF("fault_injection_openning_index")) { /* TODO: all operations creating/deleting the index file or a log, should @@ -3598,7 +3722,7 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, open_purge_index_file(FALSE) || purge_index_entry(NULL, NULL, need_mutex) || close_purge_index_file() || - DBUG_EVALUATE_IF("fault_injection_recovering_index", 1, 0)) + DBUG_IF("fault_injection_recovering_index")) { sql_print_error("MYSQL_BIN_LOG::open_index_file failed to sync the index " "file."); @@ -3667,7 +3791,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (open_purge_index_file(TRUE) || register_create_index_entry(log_file_name) || sync_purge_index_file() || - DBUG_EVALUATE_IF("fault_injection_registering_index", 1, 0)) + DBUG_IF("fault_injection_registering_index")) { /** TODO: @@ -3949,7 +4073,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, As this is a new log file, we write the file name to the index file. As every time we write to the index file, we sync it. */ - if (DBUG_EVALUATE_IF("fault_injection_updating_index", 1, 0) || + if (DBUG_IF("fault_injection_updating_index") || my_b_write(&index_file, (uchar*) log_file_name, strlen(log_file_name)) || my_b_write(&index_file, (uchar*) "\n", 1) || @@ -5362,8 +5486,8 @@ int MYSQL_BIN_LOG::new_file_impl() r.checksum_alg= relay_log_checksum_alg; DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - if (DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", - (error= close_on_error= TRUE), FALSE) || + if ((DBUG_IF("fault_injection_new_file_rotate_event") && + (error= close_on_error= TRUE)) || (error= write_event(&r))) { DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno= 2;); @@ -5799,6 +5923,39 @@ binlog_cache_mngr *THD::binlog_setup_trx_data() DBUG_RETURN(cache_mngr); } + +/* + Two phase logged ALTER getter and setter methods. +*/ +uchar THD::get_binlog_flags_for_alter() +{ + return mysql_bin_log.is_open() ? binlog_setup_trx_data()->gtid_flags3 : 0; +} + +void THD::set_binlog_flags_for_alter(uchar flags) +{ + if (mysql_bin_log.is_open()) + { + // SA must find the flag set empty + DBUG_ASSERT(flags != Gtid_log_event::FL_START_ALTER_E1 || + binlog_setup_trx_data()->gtid_flags3 == 0); + + binlog_setup_trx_data()->gtid_flags3= flags; + } +} + +uint64 THD::get_binlog_start_alter_seq_no() +{ + return mysql_bin_log.is_open() ? binlog_setup_trx_data()->sa_seq_no : 0; +} + +void THD::set_binlog_start_alter_seq_no(uint64 s_no) +{ + if (mysql_bin_log.is_open()) + binlog_setup_trx_data()->sa_seq_no= s_no; +} + + /* Function to start a statement and optionally a transaction for the binary log. @@ -6329,6 +6486,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_RETURN(true); thd->set_last_commit_gtid(gtid); + if (thd->get_binlog_flags_for_alter() & Gtid_log_event::FL_START_ALTER_E1) + thd->set_binlog_start_alter_seq_no(gtid.seq_no); Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, LOG_EVENT_SUPPRESS_USE_F, is_transactional, @@ -6776,7 +6935,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) Write the event. */ if (write_event(event_info, cache_data, file) || - DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) + DBUG_IF("injecting_fault_writing")) goto err; error= 0; @@ -7558,9 +7717,9 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) if (likely(is_open())) { prev_binlog_id= current_binlog_id; - if (likely( - !(error= DBUG_EVALUATE_IF("incident_event_write_error", 1, - write_incident_already_locked(thd)))) && + if (likely(!(error= DBUG_IF("incident_event_write_error") + ? 1 + : write_incident_already_locked(thd))) && likely(!(error= flush_and_sync(0)))) { update_binlog_end_pos(); @@ -8553,7 +8712,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); ++num_commits; if (current->cache_mngr->using_xa && likely(!current->error) && - DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1)) + !DBUG_IF("skip_commit_ordered")) { mysql_mutex_lock(¤t->thd->LOCK_thd_data); run_commit_ordered(current->thd, current->all); |