diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 274 |
1 files changed, 212 insertions, 62 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 6c9cfc250c5..99bddb7b9b0 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -16,10 +16,9 @@ #include "mysql_priv.h" -#ifdef HAVE_REPLICATION - #include <mysql.h> #include <myisam.h> +#include "rpl_rli.h" #include "slave.h" #include "sql_repl.h" #include "rpl_filter.h" @@ -28,6 +27,10 @@ #include <my_dir.h> #include <sql_common.h> +#ifdef HAVE_REPLICATION + +#include "rpl_tblmap.h" + #define MAX_SLAVE_RETRY_PAUSE 5 bool use_slave_mask = 0; MY_BITMAP slave_error_mask; @@ -48,8 +51,6 @@ ulonglong relay_log_space_limit = 0; */ int disconnect_slave_event_count = 0, abort_slave_event_count = 0; -int events_till_abort = -1; -static int events_till_disconnect = -1; typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; @@ -860,19 +861,48 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli) { DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun - return rli->abort_slave || abort_loop || thd->killed; + if (abort_loop || thd->killed || rli->abort_slave) + { + /* + If we are in an unsafe situation (stopping could corrupt replication), + we give one minute to the slave SQL thread of grace before really + terminating, in the hope that it will be able to read more events and + the unsafe situation will soon be left. Note that this one minute starts + from the last time anything happened in the slave SQL thread. So it's + really one minute of idleness, we don't timeout if the slave SQL thread + is actively working. + */ + if (!rli->unsafe_to_stop_at) + return 1; + DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving " + "it some grace period")); + if (difftime(time(0), rli->unsafe_to_stop_at) > 60) + { + slave_print_msg(ERROR_LEVEL, rli, 0, + "SQL thread had to stop in an unsafe situation, in " + "the middle of applying updates to a " + "non-transactional table without any primary key. " + "There is a risk of duplicate updates when the slave " + "SQL thread is restarted. Please check your tables' " + "contents after restart."); + return 1; + } + } + return 0; } /* - Writes an error message to rli->last_slave_error and rli->last_slave_errno - (which will be displayed by SHOW SLAVE STATUS), and prints it to stderr. + Writes a message to stderr, and if it's an error message, to + rli->last_slave_error and rli->last_slave_errno (which will be displayed by + SHOW SLAVE STATUS). SYNOPSIS - slave_print_error() - rli + slave_print_msg() + level The severity level + rli err_code The error code - msg The error message (usually related to the error code, but can + msg The message (usually related to the error code, but can contain more information). ... (this is printf-like format, with % symbols in msg) @@ -880,22 +910,47 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli) void */ -void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...) +void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, + int err_code, const char* msg, ...) { + void (*report_function)(const char *, ...); + char buff[MAX_SLAVE_ERRMSG], *pbuff= buff; + uint pbuffsize= sizeof(buff); va_list args; va_start(args,msg); - my_vsnprintf(rli->last_slave_error, - sizeof(rli->last_slave_error), msg, args); - rli->last_slave_errno = err_code; - /* If the error string ends with '.', do not add a ',' it would be ugly */ - if (rli->last_slave_error[0] && - (*(strend(rli->last_slave_error)-1) == '.')) - sql_print_error("Slave: %s Error_code: %d", rli->last_slave_error, - err_code); + switch (level) + { + case ERROR_LEVEL: + /* + This my_error call only has effect in client threads. + Slave threads do nothing in my_error(). + */ + my_error(ER_UNKNOWN_ERROR, MYF(0), msg); + /* + It's an error, it must be reported in Last_error and Last_errno in SHOW + SLAVE STATUS. + */ + pbuff= rli->last_slave_error; + pbuffsize= sizeof(rli->last_slave_error); + rli->last_slave_errno = err_code; + report_function= sql_print_error; + break; + case WARNING_LEVEL: + report_function= sql_print_warning; + break; + case INFORMATION_LEVEL: + report_function= sql_print_information; + break; + default: + DBUG_ASSERT(0); // should not come here + return; // don't crash production builds, just do nothing + } + my_vsnprintf(pbuff, pbuffsize, msg, args); + /* If the msg string ends with '.', do not add a ',' it would be ugly */ + if (pbuff[0] && (*(strend(pbuff)-1) == '.')) + (*report_function)("Slave: %s Error_code: %d", pbuff, err_code); else - sql_print_error("Slave: %s, Error_code: %d", rli->last_slave_error, - err_code); - + (*report_function)("Slave: %s, Error_code: %d", pbuff, err_code); } /* @@ -919,7 +974,6 @@ bool net_request_file(NET* net, const char* fname) DBUG_RETURN(net_write_command(net, 251, fname, strlen(fname), "", 0)); } - /* From other comments and tests in code, it looks like sometimes Query_log_event and Load_log_event can have db == 0 @@ -932,7 +986,6 @@ const char *print_slave_db_safe(const char* db) return (db ? db : ""); } - static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, const char *default_val) { @@ -1379,6 +1432,7 @@ static int init_relay_log_info(RELAY_LOG_INFO* rli, const char* msg = 0; int error = 0; DBUG_ENTER("init_relay_log_info"); + DBUG_ASSERT(!rli->no_storage); // Don't init if there is no storage if (rli->inited) // Set if this function called DBUG_RETURN(0); @@ -1674,7 +1728,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, MASTER_INFO *mi) if (rli->ign_master_log_name_end[0]) { DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); - Rotate_log_event *ev= new Rotate_log_event(thd, rli->ign_master_log_name_end, + Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end, 0, rli->ign_master_log_pos_end, Rotate_log_event::DUP_NAME); rli->ign_master_log_name_end[0]= 0; @@ -2241,17 +2295,17 @@ bool flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache) st_relay_log_info::st_relay_log_info() - :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0), + :no_storage(FALSE), info_fd(-1), cur_log_fd(-1), save_temporary_tables(0), cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), - until_log_pos(0), retried_trans(0) + until_log_pos(0), retried_trans(0), m_reload_flags(RELOAD_NONE_F), + unsafe_to_stop_at(0) { group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0; last_slave_error[0]= until_log_name[0]= ign_master_log_name_end[0]= 0; - bzero((char*) &info_file, sizeof(info_file)); bzero((char*) &cache_buf, sizeof(cache_buf)); cached_charset_invalidate(); @@ -2671,11 +2725,9 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings) /* my_real_read() will time us out We check if we were told to die, and if not, try reading again - - TODO: Move 'events_till_disconnect' to the MASTER_INFO structure */ #ifndef DBUG_OFF - if (disconnect_slave_event_count && !(events_till_disconnect--)) + if (disconnect_slave_event_count && !(mi->events_till_disconnect--)) return packet_error; #endif @@ -2950,7 +3002,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) thd->lex->current_select= 0; if (!ev->when) ev->when = time(NULL); - ev->thd = thd; + ev->thd = thd; // because up to this point, ev->thd == 0 exec_res = ev->exec_event(rli); DBUG_ASSERT(rli->sql_thd==thd); /* @@ -3022,7 +3074,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) else { pthread_mutex_unlock(&rli->data_lock); - slave_print_error(rli, 0, "\ + slave_print_msg(ERROR_LEVEL, rli, 0, "\ Could not parse relay log event entry. The possible reasons are: the master's \ binary log is corrupted (you can check this by running 'mysqlbinlog' on the \ binary log), the slave's relay log is corrupted (you can check this by running \ @@ -3051,9 +3103,6 @@ pthread_handler_t handle_slave_io(void *arg) my_thread_init(); DBUG_ENTER("handle_slave_io"); -#ifndef DBUG_OFF -slave_begin: -#endif DBUG_ASSERT(mi->inited); mysql= NULL ; retry_count= 0; @@ -3063,7 +3112,7 @@ slave_begin: mi->slave_run_id++; #ifndef DBUG_OFF - mi->events_till_abort = abort_slave_event_count; + mi->events_till_disconnect = disconnect_slave_event_count; #endif thd= new THD; // note that contructor of THD uses DBUG_ ! @@ -3301,14 +3350,6 @@ ignore_log_space_limit=%d", log space"); goto err; } - // TODO: check debugging abort code -#ifndef DBUG_OFF - if (abort_slave_event_count && !--events_till_abort) - { - sql_print_error("Slave I/O thread: debugging abort"); - goto err; - } -#endif } } @@ -3347,10 +3388,6 @@ err: pthread_mutex_unlock(&LOCK_thread_count); pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done pthread_mutex_unlock(&mi->run_lock); -#ifndef DBUG_OFF - if (abort_slave_event_count && !events_till_abort) - goto slave_begin; -#endif my_thread_end(); pthread_exit(0); DBUG_RETURN(0); // Can't return anything here @@ -3370,10 +3407,6 @@ pthread_handler_t handle_slave_sql(void *arg) my_thread_init(); DBUG_ENTER("handle_slave_sql"); -#ifndef DBUG_OFF -slave_begin: -#endif - DBUG_ASSERT(rli->inited); pthread_mutex_lock(&rli->run_lock); DBUG_ASSERT(!rli->slave_running); @@ -3520,6 +3553,14 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); err: + + /* + Some events set some playgrounds, which won't be cleared because thread + stops. Stopping of this thread may not be known to these events ("stop" + request is detected only by the present function, not by events), so we + must "proactively" clear playgrounds: + */ + rli->cleanup_context(thd, 1); VOID(pthread_mutex_lock(&LOCK_thread_count)); /* Some extra safety, which should not been needed (normally, event deletion @@ -3565,10 +3606,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ pthread_cond_broadcast(&rli->stop_cond); // tell the world we are done pthread_mutex_unlock(&rli->run_lock); -#ifndef DBUG_OFF // TODO: reconsider the code below - if (abort_slave_event_count && !rli->events_till_abort) - goto slave_begin; -#endif my_thread_end(); pthread_exit(0); DBUG_RETURN(0); // Can't return anything here @@ -3721,7 +3758,7 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev) rotate event forever, so we need to not disconnect after one. */ if (disconnect_slave_event_count) - events_till_disconnect++; + mi->events_till_disconnect++; #endif /* @@ -4177,7 +4214,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, DBUG_ENTER("connect_to_master"); #ifndef DBUG_OFF - events_till_disconnect = disconnect_slave_event_count; + mi->events_till_disconnect = disconnect_slave_event_count; #endif ulong client_flag= CLIENT_REMEMBER_OPTIONS; if (opt_slave_compressed_protocol) @@ -4311,6 +4348,10 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi, bool flush_relay_log_info(RELAY_LOG_INFO* rli) { bool error=0; + + if (unlikely(rli->no_storage)) + return 0; + IO_CACHE *file = &rli->info_file; char buff[FN_REFLEN*2+22*2+4], *pos; @@ -4327,6 +4368,7 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli) error=1; if (flush_io_cache(file)) error=1; + /* Flushing the relay log is done by the slave I/O thread */ return error; } @@ -4357,7 +4399,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) } -Log_event* next_event(RELAY_LOG_INFO* rli) +static Log_event* next_event(RELAY_LOG_INFO* rli) { Log_event* ev; IO_CACHE* cur_log = rli->cur_log; @@ -4368,6 +4410,11 @@ Log_event* next_event(RELAY_LOG_INFO* rli) DBUG_ENTER("next_event"); DBUG_ASSERT(thd != 0); +#ifndef DBUG_OFF + if (abort_slave_event_count && !rli->events_till_abort--) + DBUG_RETURN(0); +#endif + /* For most operations we need to protect rli members with data_lock, so we assume calling function acquired this mutex for us and we will @@ -4489,7 +4536,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) { /* We generate and return a Rotate, to make our positions advance */ DBUG_PRINT("info",("seeing an ignored end segment")); - ev= new Rotate_log_event(thd, rli->ign_master_log_name_end, + ev= new Rotate_log_event(rli->ign_master_log_name_end, 0, rli->ign_master_log_pos_end, Rotate_log_event::DUP_NAME); rli->ign_master_log_name_end[0]= 0; @@ -4737,11 +4784,114 @@ end: DBUG_VOID_RETURN; } +/* + Some system tables needed to be re-read by the MySQL server after it has + updated them; in statement-based replication, the GRANT and other commands + are sent verbatim to the slave which then reloads; in row-based replication, + changes to these tables are done through ordinary Rows binlog events, so + master must add some flag for the slave to know it has to reload the tables. +*/ +struct st_reload_entry +{ + char const *table; + st_relay_log_info::enum_reload_flag flag; +}; + +/* + Sorted array of table names, please keep it sorted since we are + using bsearch() on it below. + */ +static st_reload_entry s_mysql_tables[] = +{ + { "columns_priv", st_relay_log_info::RELOAD_GRANT_F }, + { "db", st_relay_log_info::RELOAD_ACCESS_F }, + { "host", st_relay_log_info::RELOAD_ACCESS_F }, + { "procs_priv", st_relay_log_info::RELOAD_GRANT_F }, + { "tables_priv", st_relay_log_info::RELOAD_GRANT_F }, + { "user", st_relay_log_info::RELOAD_ACCESS_F } +}; + +static const my_size_t s_mysql_tables_size = + sizeof(s_mysql_tables)/sizeof(*s_mysql_tables); + +static int reload_entry_compare(const void *lhs, const void *rhs) +{ + const char *lstr = static_cast<const char *>(lhs); + const char *rstr = static_cast<const st_reload_entry*>(rhs)->table; + return strcmp(lstr, rstr); +} + +void st_relay_log_info::touching_table(char const* db, char const* table, + ulong table_id) +{ + if (strcmp(db,"mysql") == 0) + { +#if defined(HAVE_BSEARCH) && defined(HAVE_SIZE_T) + void *const ptr= bsearch(table, s_mysql_tables, + s_mysql_tables_size, + sizeof(*s_mysql_tables), reload_entry_compare); + st_reload_entry const *const entry= static_cast<st_reload_entry*>(ptr); +#else + /* + Fall back to full scan, there are few rows anyway and updating the + "mysql" database is rare. + */ + st_reload_entry const *entry= s_mysql_tables; + for ( ; entry < s_mysql_tables + s_mysql_tables_size ; entry++) + if (reload_entry_compare(table, entry) == 0) + break; +#endif + if (entry) + m_reload_flags|= entry->flag; + } +} + +void st_relay_log_info::transaction_end(THD* thd) +{ + if (m_reload_flags != RELOAD_NONE_F) + { + if (m_reload_flags & RELOAD_ACCESS_F) + acl_reload(thd); + + if (m_reload_flags & RELOAD_GRANT_F) + grant_reload(thd); + + m_reload_flags= RELOAD_NONE_F; + } +} + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +void st_relay_log_info::cleanup_context(THD *thd, bool error) +{ + DBUG_ASSERT(sql_thd == thd); + /* + 1) Instances of Table_map_log_event, if ::exec_event() was called on them, + may have opened tables, which we cannot be sure have been closed (because + maybe the Rows_log_event have not been found or will not be, because slave + SQL thread is stopping, or relay log has a missing tail etc). So we close + all thread's tables. And so the table mappings have to be cancelled. + 2) Rows_log_event::exec_event() may even have started statements or + transactions on them, which we need to rollback in case of error. + 3) If finding a Format_description_log_event after a BEGIN, we also need + to rollback before continuing with the next events. + 4) so we need this "context cleanup" function. + */ + if (error) + { + ha_autocommit_or_rollback(thd, 1); // if a "statement transaction" + end_trans(thd, ROLLBACK); // if a "real transaction" + } + m_table_map.clear_tables(); + close_thread_tables(thd); + unsafe_to_stop_at= 0; +} +#endif + #ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION template class I_List_iterator<i_string>; template class I_List_iterator<i_string_pair>; #endif - #endif /* HAVE_REPLICATION */ + |