diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 498 |
1 files changed, 341 insertions, 157 deletions
diff --git a/sql/log.cc b/sql/log.cc index d119b88c4e0..480e3b696cc 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -55,10 +55,14 @@ #include "sql_show.h" #include "my_pthread.h" #include "semisync_master.h" -#include "wsrep_mysqld.h" #include "sp_rcontext.h" #include "sp_head.h" +#include "wsrep_mysqld.h" +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" +#endif /* WITH_WSREP */ + /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 #define MAX_TIME_SIZE 32 @@ -869,10 +873,10 @@ bool Log_to_csv_event_handler:: Open_tables_backup open_tables_backup; CHARSET_INFO *client_cs= thd->variables.character_set_client; bool save_time_zone_used; - long query_time= (long) MY_MIN(query_utime/1000000, TIME_MAX_VALUE_SECONDS); - long lock_time= (long) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS); - long query_time_micro= (long) (query_utime % 1000000); - long lock_time_micro= (long) (lock_utime % 1000000); + ulong query_time= (ulong) MY_MIN(query_utime/1000000, TIME_MAX_VALUE_SECONDS); + ulong lock_time= (ulong) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS); + ulong query_time_micro= (ulong) (query_utime % 1000000); + ulong lock_time_micro= (ulong) (lock_utime % 1000000); DBUG_ENTER("Log_to_csv_event_handler::log_slow"); @@ -1165,6 +1169,10 @@ bool LOGGER::error_log_print(enum loglevel level, const char *format, { bool error= FALSE; Log_event_handler **current_handler; + THD *thd= current_thd; + + if (likely(thd)) + thd->error_printed_to_log= 1; /* currently we don't need locking here as there is no error_log table */ for (current_handler= error_log_handler_list ; *current_handler ;) @@ -1702,7 +1710,7 @@ static int binlog_close_connection(handlerton *hton, THD *thd) (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); #ifdef WITH_WSREP if (cache_mngr && !cache_mngr->trx_cache.empty()) { - IO_CACHE* cache= get_trans_log(thd); + IO_CACHE* cache= cache_mngr->get_binlog_cache_log(true); uchar *buf; size_t len=0; wsrep_write_cache_buf(cache, &buf, &len); @@ -2186,18 +2194,30 @@ void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional) { if (is_transactional) { - my_message(ER_TRANS_CACHE_FULL, ER_THD(thd, ER_TRANS_CACHE_FULL), MYF(MY_WME)); + my_message(ER_TRANS_CACHE_FULL, ER_THD(thd, ER_TRANS_CACHE_FULL), MYF(0)); } else { - my_message(ER_STMT_CACHE_FULL, ER_THD(thd, ER_STMT_CACHE_FULL), MYF(MY_WME)); + my_message(ER_STMT_CACHE_FULL, ER_THD(thd, ER_STMT_CACHE_FULL), MYF(0)); } } else { - my_error(ER_ERROR_ON_WRITE, MYF(MY_WME), name, errno); + my_error(ER_ERROR_ON_WRITE, MYF(0), name, errno); } - +#ifdef WITH_WSREP + /* If wsrep transaction is active and binlog emulation is on, + binlog write error may leave transaction without any registered + htons. This makes wsrep rollback hooks to be skipped and the + transaction will remain alive in wsrep world after rollback. + Register binlog hton here to ensure that rollback happens in full. */ + if (WSREP_EMULATE_BINLOG(thd)) + { + if (is_transactional) + trans_register_ha(thd, TRUE, binlog_hton); + trans_register_ha(thd, FALSE, binlog_hton); + } +#endif /* WITH_WSREP */ DBUG_VOID_RETURN; } @@ -2290,8 +2310,17 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) non-transactional table. Otherwise, truncate the binlog cache starting from the SAVEPOINT command. */ +#ifdef WITH_WSREP + /* for streaming replication, we must replicate savepoint rollback so that + slaves can maintain SR transactions + */ + if (unlikely(thd->wsrep_trx().is_streaming() || + (trans_has_updated_non_trans_table(thd)) || + (thd->variables.option_bits & OPTION_KEEP_LOG))) +#else if (unlikely(trans_has_updated_non_trans_table(thd) || (thd->variables.option_bits & OPTION_KEEP_LOG))) +#endif /* WITH_WSREP */ { char buf[1024]; String log_query(buf, sizeof(buf), &my_charset_bin); @@ -2417,7 +2446,7 @@ static void setup_windows_event_source() // Create the event source registry key dwError= RegCreateKey(HKEY_LOCAL_MACHINE, - "SYSTEM\\CurrentControlSet\\Services\\EventLog\\Application\\MySQL", + "SYSTEM\\CurrentControlSet\\Services\\EventLog\\Application\\MariaDB", &hRegKey); /* Name of the PE module that contains the message resource */ @@ -2449,11 +2478,19 @@ static void setup_windows_event_source() exceeds FN_REFLEN; (ii) if the number of extensions is exhausted; or (iii) some other error happened while examining the filesystem. + @param name Base name of file + @param min_log_number_to_use minimum log number to choose. Set by + CHANGE MASTER .. TO + @param last_used_log_number If 0, find log number based on files. + If not 0, then use *last_used_log_number +1 + Will be update to new generated number @return + 0 ok nonzero if not possible to get unique filename. */ -static int find_uniq_filename(char *name, ulong next_log_number) +static int find_uniq_filename(char *name, ulong min_log_number_to_use, + ulong *last_used_log_number) { uint i; char buff[FN_REFLEN], ext_buf[FN_REFLEN]; @@ -2472,24 +2509,34 @@ static int find_uniq_filename(char *name, ulong next_log_number) *end='.'; length= (size_t) (end - start + 1); - if ((DBUG_EVALUATE_IF("error_unique_log_filename", 1, - unlikely(!(dir_info= my_dir(buff, - MYF(MY_DONT_SORT))))))) - { // This shouldn't happen - strmov(end,".1"); // use name+1 - DBUG_RETURN(1); - } - file_info= dir_info->dir_entry; - max_found= next_log_number ? next_log_number-1 : 0; - for (i= dir_info->number_of_files ; i-- ; file_info++) + /* The following matches the code for my_dir () below */ + DBUG_EXECUTE_IF("error_unique_log_filename", + { + strmov(end,".1"); + DBUG_RETURN(1); + }); + + if (*last_used_log_number) + max_found= *last_used_log_number; + else { - if (strncmp(file_info->name, start, length) == 0 && - test_if_number(file_info->name+length, &number,0)) + if (unlikely(!(dir_info= my_dir(buff, MYF(MY_DONT_SORT))))) + { // This shouldn't happen + strmov(end,".1"); // use name+1 + DBUG_RETURN(1); + } + file_info= dir_info->dir_entry; + max_found= min_log_number_to_use ? min_log_number_to_use-1 : 0; + for (i= dir_info->number_of_files ; i-- ; file_info++) { - set_if_bigger(max_found, number); + if (strncmp(file_info->name, start, length) == 0 && + test_if_number(file_info->name+length, &number,0)) + { + set_if_bigger(max_found, number); + } } + my_dirend(dir_info); } - my_dirend(dir_info); /* check if reached the maximum possible extension number */ if (max_found >= MAX_LOG_UNIQUE_FN_EXT) @@ -2528,6 +2575,7 @@ index files.", name, ext_buf, (strlen(ext_buf) + (end - name))); error= 1; goto end; } + *last_used_log_number= next; /* print warning if reaching the end of available extensions. */ if ((next > (MAX_LOG_UNIQUE_FN_EXT - LOG_WARN_UNIQUE_FN_EXT_LEFT))) @@ -2646,7 +2694,7 @@ bool MYSQL_LOG::open( #endif if ((file= mysql_file_open(log_file_key, log_file_name, open_flags, - MYF(MY_WME | ME_WAITTANG))) < 0) + MYF(MY_WME))) < 0) goto err; if (is_fifo) @@ -2779,19 +2827,24 @@ int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name, ulong next_log_number) { fn_format(new_name, log_name, mysql_data_home, "", 4); - if (log_type == LOG_BIN) + return 0; +} + +int MYSQL_BIN_LOG::generate_new_name(char *new_name, const char *log_name, + ulong next_log_number) +{ + fn_format(new_name, log_name, mysql_data_home, "", 4); + if (!fn_ext(log_name)[0]) { - if (!fn_ext(log_name)[0]) + if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) || + unlikely(find_uniq_filename(new_name, next_log_number, + &last_used_log_number))) { - if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) || - unlikely(find_uniq_filename(new_name, next_log_number))) - { - THD *thd= current_thd; - if (unlikely(thd)) - my_error(ER_NO_UNIQUE_LOGFILE, MYF(ME_FATALERROR), log_name); - sql_print_error(ER_DEFAULT(ER_NO_UNIQUE_LOGFILE), log_name); - return 1; - } + THD *thd= current_thd; + if (unlikely(thd)) + my_error(ER_NO_UNIQUE_LOGFILE, MYF(ME_FATAL), log_name); + sql_print_error(ER_DEFAULT(ER_NO_UNIQUE_LOGFILE), log_name); + return 1; } } return 0; @@ -3191,7 +3244,8 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) :reset_master_pending(0), mark_xid_done_waiting(0), - bytes_written(0), file_id(1), open_count(1), + bytes_written(0), last_used_log_number(0), + file_id(1), open_count(1), group_commit_queue(0), group_commit_queue_busy(FALSE), num_commits(0), num_group_commits(0), group_commit_trigger_count(0), group_commit_trigger_timeout(0), @@ -4167,6 +4221,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD *thd, bool create_new_log, name=0; // Protect against free close(LOG_CLOSE_TO_BE_OPENED); + last_used_log_number= 0; // Reset log number cache + /* First delete all old log files and then update the index file. As we first delete the log files and do not use sort of logging, @@ -4620,7 +4676,7 @@ int MYSQL_BIN_LOG::open_purge_index_file(bool destroy) if (!my_b_inited(&purge_index_file)) { if ((file= my_open(purge_index_file_name, O_RDWR | O_CREAT | O_BINARY, - MYF(MY_WME | ME_WAITTANG))) < 0 || + MYF(MY_WME))) < 0 || init_io_cache(&purge_index_file, file, IO_SIZE, (destroy ? WRITE_CACHE : READ_CACHE), 0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL))) @@ -5102,7 +5158,11 @@ bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg) int MYSQL_BIN_LOG::new_file() { - return new_file_impl(1); + int res; + mysql_mutex_lock(&LOCK_log); + res= new_file_impl(); + mysql_mutex_unlock(&LOCK_log); + return res; } /* @@ -5111,7 +5171,7 @@ int MYSQL_BIN_LOG::new_file() */ int MYSQL_BIN_LOG::new_file_without_locking() { - return new_file_impl(0); + return new_file_impl(); } @@ -5127,7 +5187,7 @@ int MYSQL_BIN_LOG::new_file_without_locking() The new file name is stored last in the index file */ -int MYSQL_BIN_LOG::new_file_impl(bool need_lock) +int MYSQL_BIN_LOG::new_file_impl() { int error= 0, close_on_error= FALSE; char new_name[FN_REFLEN], *new_name_ptr, *old_name, *file_to_open; @@ -5136,14 +5196,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) File UNINIT_VAR(old_file); DBUG_ENTER("MYSQL_BIN_LOG::new_file_impl"); - if (need_lock) - mysql_mutex_lock(&LOCK_log); + DBUG_ASSERT(log_type == LOG_BIN); mysql_mutex_assert_owner(&LOCK_log); if (!is_open()) { DBUG_PRINT("info",("log is closed")); - mysql_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -5162,7 +5220,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) #ifdef ENABLE_AND_FIX_HANG close_on_error= TRUE; #endif - goto end; + goto end2; } new_name_ptr=new_name; @@ -5189,7 +5247,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) close_on_error= TRUE; my_printf_error(ER_ERROR_ON_WRITE, ER_THD_OR_DEFAULT(current_thd, ER_CANT_OPEN_FILE), - MYF(ME_FATALERROR), name, errno); + MYF(ME_FATAL), name, errno); goto end; } bytes_written += r.data_written; @@ -5258,14 +5316,21 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) /* handle reopening errors */ if (unlikely(error)) { - my_error(ER_CANT_OPEN_FILE, MYF(ME_FATALERROR), file_to_open, error); + my_error(ER_CANT_OPEN_FILE, MYF(ME_FATAL), file_to_open, error); close_on_error= TRUE; } my_free(old_name); end: + /* In case of errors, reuse the last generated log file name */ + if (unlikely(error)) + { + DBUG_ASSERT(last_used_log_number > 0); + last_used_log_number--; + } +end2: if (delay_close) { clear_inuse_flag_when_closing(old_file); @@ -5291,8 +5356,6 @@ end: } mysql_mutex_unlock(&LOCK_index); - if (need_lock) - mysql_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -5652,7 +5715,18 @@ THD::binlog_start_trans_and_stmt() this->binlog_set_stmt_begin(); bool mstmt_mode= in_multi_stmt_transaction_mode(); #ifdef WITH_WSREP - /* Write Gtid + /* + With wsrep binlog emulation we can skip the rest because the + binlog cache will not be written into binlog. Note however that + because of this the hton callbacks will not get called to clean + up the cache, so this must be done explicitly when the transaction + terminates. + */ + if (WSREP_EMULATE_BINLOG_NNULL(this)) + { + DBUG_VOID_RETURN; + } + /* Write Gtid Get domain id only when gtid mode is set If this event is replicate through a master then , we will forward the same gtid another nodes @@ -5757,10 +5831,10 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, /* Ensure that all events in a GTID group are in the same cache */ if (variables.option_bits & OPTION_GTID_BEGIN) is_transactional= 1; - + /* Pre-conditions */ DBUG_ASSERT(is_current_stmt_binlog_format_row()); - DBUG_ASSERT(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()); + DBUG_ASSERT(WSREP_EMULATE_BINLOG_NNULL(this) || mysql_bin_log.is_open()); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); Table_map_log_event @@ -5958,7 +6032,9 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_PRINT("enter", ("standalone: %d", standalone)); #ifdef WITH_WSREP - if (WSREP(thd) && thd->wsrep_trx_meta.gtid.seqno != -1 && wsrep_gtid_mode && !thd->variables.gtid_seq_no) + if (WSREP(thd) && + (wsrep_thd_trx_seqno(thd) > 0) && + wsrep_gtid_mode && !thd->variables.gtid_seq_no) { domain_id= wsrep_gtid_domain_id; } else { @@ -6061,7 +6137,7 @@ MYSQL_BIN_LOG::write_state_to_file() goto end; err: - sql_print_error("Error writing binlog state to file '%s'.\n", buf); + sql_print_error("Error writing binlog state to file '%s'.", buf); if (log_inited) end_io_cache(&cache); end: @@ -6121,7 +6197,7 @@ MYSQL_BIN_LOG::read_state_from_file() goto end; err: - sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf); + sql_print_error("Error reading binlog GTID state from file '%s'.", buf); end: if (log_inited) end_io_cache(&cache); @@ -6275,7 +6351,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) */ /* applier and replayer can skip writing binlog events */ if ((WSREP_EMULATE_BINLOG(thd) && - IF_WSREP(thd->wsrep_exec_mode != REPL_RECV, 0)) || is_open()) + IF_WSREP(thd->wsrep_cs().mode() == wsrep::client_state::m_local, 0)) || is_open()) { my_off_t UNINIT_VAR(my_org_b_tell); #ifdef HAVE_REPLICATION @@ -6306,11 +6382,25 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) if (direct) { + /* We come here only for incident events */ int res; uint64 commit_id= 0; + MDL_request mdl_request; DBUG_PRINT("info", ("direct is set")); + DBUG_ASSERT(!thd->backup_commit_lock); + + mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT); + thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout); + thd->backup_commit_lock= &mdl_request; + if ((res= thd->wait_for_prior_commit())) + { + if (mdl_request.ticket) + thd->mdl_context.release_lock(mdl_request.ticket); + thd->backup_commit_lock= 0; DBUG_RETURN(res); + } file= &log_file; my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); @@ -6325,7 +6415,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) commit_name.length); commit_id= entry->val_int(&null_value); }); - if (write_gtid_event(thd, true, using_trans, commit_id)) + res= write_gtid_event(thd, true, using_trans, commit_id); + if (mdl_request.ticket) + thd->mdl_context.release_lock(mdl_request.ticket); + thd->backup_commit_lock= 0; + if (res) goto err; } else @@ -6466,25 +6560,8 @@ err: it's list before dump-thread tries to send it */ update_binlog_end_pos(offset); - /* - If a transaction with the LOAD DATA statement is divided - into logical mini-transactions (of the 10K rows) and binlog - is rotated, then the last portion of data may be lost due to - wsrep handler re-registration at the boundary of the split. - Since splitting of the LOAD DATA into mini-transactions is - logical, we should not allow these mini-transactions to fall - into separate binlogs. Therefore, it is necessary to prohibit - the rotation of binlog in the middle of processing LOAD DATA: - */ -#ifdef WITH_WSREP - if (!thd->wsrep_split_flag) - { -#endif /* WITH_WSREP */ if (unlikely((error= rotate(false, &check_purge)))) check_purge= false; -#ifdef WITH_WSREP - } -#endif /* WITH_WSREP */ } } } @@ -7211,25 +7288,8 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) likely(!(error= flush_and_sync(0)))) { update_binlog_end_pos(); - /* - If a transaction with the LOAD DATA statement is divided - into logical mini-transactions (of the 10K rows) and binlog - is rotated, then the last portion of data may be lost due to - wsrep handler re-registration at the boundary of the split. - Since splitting of the LOAD DATA into mini-transactions is - logical, we should not allow these mini-transactions to fall - into separate binlogs. Therefore, it is necessary to prohibit - the rotation of binlog in the middle of processing LOAD DATA: - */ -#ifdef WITH_WSREP - if (!thd->wsrep_split_flag) - { -#endif /* WITH_WSREP */ if (unlikely((error= rotate(false, &check_purge)))) check_purge= false; -#ifdef WITH_WSREP - } -#endif /* WITH_WSREP */ } offset= my_b_tell(&log_file); @@ -7280,7 +7340,7 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg ability to do crash recovery - crash recovery will just have to scan a bit more of the binlog than strictly necessary. */ - sql_print_error("Failed to write binlog checkpoint event to binary log\n"); + sql_print_error("Failed to write binlog checkpoint event to binary log"); } offset= my_b_tell(&log_file); @@ -7421,7 +7481,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) group_commit_entry *entry, *orig_queue, *last; wait_for_commit *cur; wait_for_commit *wfc; + bool backup_lock_released= 0; + int result= 0; + THD *thd= orig_entry->thd; DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit"); + DBUG_ASSERT(thd == current_thd); /* Check if we need to wait for another transaction to commit before us. @@ -7433,8 +7497,10 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) */ wfc= orig_entry->thd->wait_for_commit_ptr; orig_entry->queued_by_other= false; - if (wfc && wfc->waitee) + if (wfc && wfc->waitee.load(std::memory_order_acquire)) { + wait_for_commit *loc_waitee; + mysql_mutex_lock(&wfc->LOCK_wait_commit); /* Do an extra check here, this time safely under lock. @@ -7446,10 +7512,25 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) before setting the flag, so there is no risk that we can queue ahead of it. */ - if (wfc->waitee && !wfc->waitee->commit_started) + if ((loc_waitee= wfc->waitee.load(std::memory_order_relaxed)) && + !loc_waitee->commit_started) { PSI_stage_info old_stage; - wait_for_commit *loc_waitee; + + /* + Release MDL_BACKUP_COMMIT LOCK while waiting for other threads to + commit. + This is needed to avoid deadlock between the other threads (which not + yet have the MDL_BACKUP_COMMIT_LOCK) and any threads using + BACKUP LOCK BLOCK_COMMIT. + */ + if (thd->backup_commit_lock && thd->backup_commit_lock->ticket && + !backup_lock_released) + { + backup_lock_released= 1; + thd->mdl_context.release_lock(thd->backup_commit_lock->ticket); + thd->backup_commit_lock->ticket= 0; + } /* By setting wfc->opaque_pointer to our own entry, we mark that we are @@ -7471,7 +7552,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) &wfc->LOCK_wait_commit, &stage_waiting_for_prior_transaction_to_commit, &old_stage); - while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed(1)) + while ((loc_waitee= wfc->waitee.load(std::memory_order_relaxed)) && + !orig_entry->thd->check_killed(1)) mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); wfc->opaque_pointer= NULL; DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", @@ -7489,14 +7571,18 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) do { mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); - } while (wfc->waitee); + } while (wfc->waitee.load(std::memory_order_relaxed)); } else { /* We were killed, so remove us from the list of waitee. */ wfc->remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); - wfc->waitee= NULL; + /* + This is the thread clearing its own status, it is no longer on + the list of waiters. So no memory barriers are needed here. + */ + wfc->waitee.store(NULL, std::memory_order_relaxed); orig_entry->thd->EXIT_COND(&old_stage); /* Interrupted by kill. */ @@ -7506,7 +7592,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) wfc->wakeup_error= ER_QUERY_INTERRUPTED; my_message(wfc->wakeup_error, ER_THD(orig_entry->thd, wfc->wakeup_error), MYF(0)); - DBUG_RETURN(-1); + result= -1; + goto end; } } orig_entry->thd->EXIT_COND(&old_stage); @@ -7520,12 +7607,13 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) then there is nothing else to do. */ if (orig_entry->queued_by_other) - DBUG_RETURN(0); + goto end; if (wfc && wfc->wakeup_error) { my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); - DBUG_RETURN(-1); + result= -1; + goto end; } /* Now enqueue ourselves in the group commit queue. */ @@ -7686,14 +7774,39 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) DBUG_PRINT("info", ("Queued for group commit as %s", (orig_queue == NULL) ? "leader" : "participant")); - DBUG_RETURN(orig_queue == NULL); + result= orig_queue == NULL; + +end: + if (backup_lock_released) + thd->mdl_context.acquire_lock(thd->backup_commit_lock, + thd->variables.lock_wait_timeout); + DBUG_RETURN(result); } bool MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { int is_leader= queue_for_group_commit(entry); - +#ifdef WITH_WSREP + if (wsrep_is_active(entry->thd) && + wsrep_run_commit_hook(entry->thd, entry->all)) + { + /* + Release commit order and if leader, wait for prior commit to + complete. This establishes total order for group leaders. + */ + if (wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error())) + { + entry->thd->wakeup_subsequent_commits(1); + return 1; + } + if (is_leader) + { + if (entry->thd->wait_for_prior_commit()) + return 1; + } + } +#endif /* WITH_WSREP */ /* The first in the queue handles group commit for all; the others just wait to be signalled when group commit is done. @@ -7775,10 +7888,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) switch (entry->error) { case ER_ERROR_ON_WRITE: - my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno); + my_error(ER_ERROR_ON_WRITE, MYF(ME_ERROR_LOG), name, entry->commit_errno); break; case ER_ERROR_ON_READ: - my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + my_error(ER_ERROR_ON_READ, MYF(ME_ERROR_LOG), entry->error_cache->file_name, entry->commit_errno); break; default: @@ -7789,7 +7902,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) */ my_printf_error(entry->error, "Error writing transaction to binary log: %d", - MYF(ME_NOREFRESH), entry->error); + MYF(ME_ERROR_LOG), entry->error); } /* @@ -7998,20 +8111,6 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mark_xids_active(binlog_id, xid_count); } - /* - If a transaction with the LOAD DATA statement is divided - into logical mini-transactions (of the 10K rows) and binlog - is rotated, then the last portion of data may be lost due to - wsrep handler re-registration at the boundary of the split. - Since splitting of the LOAD DATA into mini-transactions is - logical, we should not allow these mini-transactions to fall - into separate binlogs. Therefore, it is necessary to prohibit - the rotation of binlog in the middle of processing LOAD DATA: - */ -#ifdef WITH_WSREP - if (!leader->thd->wsrep_split_flag) - { -#endif /* WITH_WSREP */ if (rotate(false, &check_purge)) { /* @@ -8028,12 +8127,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) when the transaction has been safely committed in the engine. */ leader->cache_mngr->delayed_error= true; - my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno); + my_error(ER_ERROR_ON_WRITE, MYF(ME_ERROR_LOG), name, errno); check_purge= false; } -#ifdef WITH_WSREP - } -#endif /* WITH_WSREP */ /* In case of binlog rotate, update the correct current binlog offset. */ commit_offset= my_b_write_tell(&log_file); } @@ -8682,18 +8778,35 @@ bool flush_error_log() } #ifdef _WIN32 +struct eventlog_source +{ + HANDLE handle; + eventlog_source() + { + setup_windows_event_source(); + handle = RegisterEventSource(NULL, "MariaDB"); + } + + ~eventlog_source() + { + if (handle) + DeregisterEventSource(handle); + } +}; + +static eventlog_source eventlog; + static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff, size_t length, size_t buffLen) { - HANDLE event; + HANDLE event= eventlog.handle; char *buffptr= buff; DBUG_ENTER("print_buffer_to_nt_eventlog"); /* Add ending CR/LF's to string, overwrite last chars if necessary */ strmov(buffptr+MY_MIN(length, buffLen-5), "\r\n\r\n"); - setup_windows_event_source(); - if ((event= RegisterEventSource(NULL,"MySQL"))) + if (event) { switch (level) { case ERROR_LEVEL: @@ -8709,7 +8822,6 @@ static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff, 0, (LPCSTR*) &buffptr, NULL); break; } - DeregisterEventSource(event); } DBUG_VOID_RETURN; @@ -10510,7 +10622,7 @@ set_binlog_snapshot_file(const char *src) Copy out current values of status variables, for SHOW STATUS or information_schema.global_status. - This is called only under LOCK_show_status, so we can fill in a static array. + This is called only under LOCK_all_status_vars, so we can fill in a static array. */ void TC_LOG_BINLOG::set_status_variables(THD *thd) @@ -10635,7 +10747,9 @@ maria_declare_plugin(binlog) maria_declare_plugin_end; #ifdef WITH_WSREP -IO_CACHE * get_trans_log(THD * thd) +#include "wsrep_mysqld.h" + +IO_CACHE *wsrep_get_trans_cache(THD * thd) { DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF); binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) @@ -10648,45 +10762,115 @@ IO_CACHE * get_trans_log(THD * thd) return NULL; } - -bool wsrep_trans_cache_is_empty(THD *thd) -{ - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - return (!cache_mngr || cache_mngr->trx_cache.empty()); -} - - -void thd_binlog_trx_reset(THD * thd) +void wsrep_thd_binlog_trx_reset(THD * thd) { + DBUG_ENTER("wsrep_thd_binlog_trx_reset"); + WSREP_DEBUG("wsrep_thd_binlog_reset"); /* todo: fix autocommit select to not call the caller */ - if (thd_get_ha_data(thd, binlog_hton) != NULL) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) { - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - if (cache_mngr) + cache_mngr->reset(false, true); + if (!cache_mngr->stmt_cache.empty()) { - cache_mngr->reset(false, true); - if (!cache_mngr->stmt_cache.empty()) - { - WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query()); - cache_mngr->stmt_cache.reset(); - } + WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query()); + cache_mngr->stmt_cache.reset(); } } thd->clear_binlog_table_maps(); + DBUG_VOID_RETURN; } - -void thd_binlog_rollback_stmt(THD * thd) +void wsrep_thd_binlog_stmt_rollback(THD * thd) { - WSREP_DEBUG("thd_binlog_rollback_stmt connection: %llu", - thd->thread_id); + DBUG_ENTER("wsrep_thd_binlog_stmt_rollback"); + WSREP_DEBUG("wsrep_thd_binlog_stmt_rollback"); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); if (cache_mngr) - cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); + { + thd->binlog_remove_pending_rows_event(TRUE, TRUE); + cache_mngr->stmt_cache.reset(); + } + DBUG_VOID_RETURN; +} + +bool wsrep_stmt_rollback_is_safe(THD* thd) +{ + bool ret(true); + + DBUG_ENTER("wsrep_binlog_stmt_rollback_is_safe"); + + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + + if (binlog_hton && cache_mngr) + { + binlog_cache_data * trx_cache = &cache_mngr->trx_cache; + if (thd->wsrep_sr().fragments_certified() > 0 && + (trx_cache->get_prev_position() == MY_OFF_T_UNDEF || + trx_cache->get_prev_position() < thd->wsrep_sr().log_position())) + { + WSREP_DEBUG("statement rollback is not safe for streaming replication" + " pre-stmt_pos: %llu, frag repl pos: %zu\n" + "Thread: %llu, SQL: %s", + trx_cache->get_prev_position(), + thd->wsrep_sr().log_position(), + thd->thread_id, thd->query()); + ret = false; + } + } + DBUG_RETURN(ret); } + +void wsrep_register_binlog_handler(THD *thd, bool trx) +{ + DBUG_ENTER("register_binlog_handler"); + /* + If this is the first call to this function while processing a statement, + the transactional cache does not have a savepoint defined. So, in what + follows: + . an implicit savepoint is defined; + . callbacks are registered; + . binary log is set as read/write. + + The savepoint allows for truncating the trx-cache transactional changes + fail. Callbacks are necessary to flush caches upon committing or rolling + back a statement or a transaction. However, notifications do not happen + if the binary log is set as read/write. + */ + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + /* cache_mngr may be missing e.g. in mtr test ev51914.test */ + if (cache_mngr) + { + /* + Set an implicit savepoint in order to be able to truncate a trx-cache. + */ + if (cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF) + { + my_off_t pos= 0; + binlog_trans_log_savepos(thd, &pos); + cache_mngr->trx_cache.set_prev_position(pos); + } + + /* + Set callbacks in order to be able to call commmit or rollback. + */ + if (trx) + trans_register_ha(thd, TRUE, binlog_hton); + trans_register_ha(thd, FALSE, binlog_hton); + + /* + Set the binary log as read/write otherwise callbacks are not called. + */ + thd->ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write(); + } + DBUG_VOID_RETURN; +} + #endif /* WITH_WSREP */ |