diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 800 |
1 files changed, 657 insertions, 143 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 2ff1a0490e9..f64ab0d015f 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -25,7 +25,7 @@ replication slave. */ -#include <my_global.h> +#include "mariadb.h" #include "sql_priv.h" #include "slave.h" #include "sql_parse.h" // execute_init_command @@ -43,7 +43,6 @@ #include <ssl_compat.h> #include "unireg.h" #include <mysys_err.h> -#include "rpl_handler.h" #include <signal.h> #include <mysql.h> #include <myisam.h> @@ -60,6 +59,8 @@ #include "rpl_tblmap.h" #include "debug_sync.h" #include "rpl_parallel.h" +#include "sql_show.h" +#include "semisync_slave.h" #include "sql_manager.h" #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -72,6 +73,9 @@ bool use_slave_mask = 0; MY_BITMAP slave_error_mask; char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; +uint *slave_transaction_retry_errors; +uint slave_transaction_retry_error_length= 0; +char slave_transaction_retry_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; char* slave_load_tmpdir = 0; Master_info *active_mi= 0; @@ -83,7 +87,7 @@ ulonglong opt_read_binlog_speed_limit = 0; const char *relay_log_index= 0; const char *relay_log_basename= 0; -LEX_STRING default_master_connection_name= { (char*) "", 0 }; +LEX_CSTRING default_master_connection_name= { (char*) "", 0 }; /* When slave thread exits, we need to remember the temporary tables so we @@ -156,7 +160,8 @@ static bool wait_for_relay_log_space(Relay_log_info* rli); static bool io_slave_killed(Master_info* mi); static bool sql_slave_killed(rpl_group_info *rgi); static int init_slave_thread(THD*, Master_info *, SLAVE_THD_TYPE); -static void print_slave_skip_errors(void); +static void make_slave_skip_errors_printable(void); +static void make_slave_transaction_retry_errors_printable(void); static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); static int safe_reconnect(THD*, MYSQL*, Master_info*, bool); static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool); @@ -280,13 +285,181 @@ static void init_slave_psi_keys(void) #endif /* HAVE_PSI_INTERFACE */ -static bool slave_background_thread_gtid_loaded; +/* + Note: This definition needs to be kept in sync with the one in + mysql_system_tables.sql which is used by mysql_create_db. +*/ +static const char gtid_pos_table_definition1[]= + "CREATE TABLE "; +static const char gtid_pos_table_definition2[]= + " (domain_id INT UNSIGNED NOT NULL, " + "sub_id BIGINT UNSIGNED NOT NULL, " + "server_id INT UNSIGNED NOT NULL, " + "seq_no BIGINT UNSIGNED NOT NULL, " + "PRIMARY KEY (domain_id, sub_id)) CHARSET=latin1 " + "COMMENT='Replication slave GTID position' " + "ENGINE="; + +/* + Build a query string + CREATE TABLE mysql.gtid_slave_pos_<engine> ... ENGINE=<engine> +*/ +static bool +build_gtid_pos_create_query(THD *thd, String *query, + LEX_CSTRING *table_name, + LEX_CSTRING *engine_name) +{ + bool err= false; + err|= query->append(gtid_pos_table_definition1); + err|= append_identifier(thd, query, table_name); + err|= query->append(gtid_pos_table_definition2); + err|= append_identifier(thd, query, engine_name); + return err; +} + + +static int +gtid_pos_table_creation(THD *thd, plugin_ref engine, LEX_CSTRING *table_name) +{ + int err; + StringBuffer<sizeof(gtid_pos_table_definition1) + + sizeof(gtid_pos_table_definition1) + + 2*FN_REFLEN> query; + + if (build_gtid_pos_create_query(thd, &query, table_name, plugin_name(engine))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + + thd->set_db(&MYSQL_SCHEMA_NAME); + thd->clear_error(); + ulonglong thd_saved_option= thd->variables.option_bits; + /* This query shuold not be binlogged. */ + thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG; + thd->set_query_and_id(query.c_ptr(), query.length(), thd->charset(), + next_query_id()); + Parser_state parser_state; + err= parser_state.init(thd, thd->query(), thd->query_length()); + if (err) + goto end; + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state, + FALSE, FALSE); + if (unlikely(thd->is_error())) + err= 1; + /* The warning is relevant to 10.3 and earlier. */ + sql_print_warning("The automatically created table '%s' name may not be " + "entirely in lowercase. The table name will be converted " + "to lowercase to any future upgrade to 10.4.0 and later " + "version where it will be auto-created at once " + "in lowercase.", + table_name->str); +end: + thd->variables.option_bits= thd_saved_option; + thd->reset_query(); + return err; +} + + +static void bg_gtid_pos_auto_create(void *hton) +{ + THD *thd= NULL; + int UNINIT_VAR(err); + plugin_ref engine= NULL, *auto_engines; + rpl_slave_state::gtid_pos_table *entry; + StringBuffer<FN_REFLEN> loc_table_name; + LEX_CSTRING table_name; + + /* + Check that the plugin is still in @@gtid_pos_auto_engines, and lock + it. + */ + mysql_mutex_lock(&LOCK_global_system_variables); + for (auto_engines= opt_gtid_pos_auto_plugins; + auto_engines && *auto_engines; + ++auto_engines) + { + if (plugin_hton(*auto_engines) == hton) + { + engine= my_plugin_lock(NULL, *auto_engines); + break; + } + } + mysql_mutex_unlock(&LOCK_global_system_variables); + if (!engine) + { + /* The engine is gone from @@gtid_pos_auto_engines, so no action. */ + goto end; + } + + /* Find the entry for the table to auto-create. */ + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + entry= (rpl_slave_state::gtid_pos_table *) + rpl_global_gtid_slave_state->gtid_pos_tables; + while (entry) + { + if (entry->table_hton == hton && + entry->state == rpl_slave_state::GTID_POS_CREATE_REQUESTED) + break; + entry= entry->next; + } + if (entry) + { + entry->state = rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS; + err= loc_table_name.append(entry->table_name.str, entry->table_name.length); + } + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (!entry) + goto end; + if (err) + { + sql_print_error("Out of memory while trying to auto-create GTID position table"); + goto end; + } + table_name.str= loc_table_name.c_ptr_safe(); + table_name.length= loc_table_name.length(); + + thd= new THD(next_thread_id()); + thd->thread_stack= (char*) &thd; /* Set approximate stack start */ + thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; + thd->store_globals(); + thd->security_ctx->skip_grants(); + thd->set_command(COM_DAEMON); + thd->variables.wsrep_on= 0; + err= gtid_pos_table_creation(thd, engine, &table_name); + if (err) + { + sql_print_error("Error auto-creating GTID position table `mysql.%s`: %s Error_code: %d", + table_name.str, thd->get_stmt_da()->message(), + thd->get_stmt_da()->sql_errno()); + thd->clear_error(); + goto end; + } + + /* Now enable the entry for the auto-created table. */ + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + entry= (rpl_slave_state::gtid_pos_table *) + rpl_global_gtid_slave_state->gtid_pos_tables; + while (entry) + { + if (entry->table_hton == hton && + entry->state == rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS) + { + entry->state= rpl_slave_state::GTID_POS_AVAILABLE; + break; + } + entry= entry->next; + } + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + +end: + delete thd; + if (engine) + plugin_unlock(NULL, engine); +} -struct slave_background_kill_t { - slave_background_kill_t *next; - THD *to_kill; -} *slave_background_kill_list; +static bool slave_background_thread_gtid_loaded; static void bg_rpl_load_gtid_slave_state(void *) { @@ -317,9 +490,7 @@ static void bg_rpl_load_gtid_slave_state(void *) static void bg_slave_kill(void *victim) { THD *to_kill= (THD *)victim; - mysql_mutex_lock(&to_kill->LOCK_thd_data); to_kill->awake(KILL_CONNECTION); - mysql_mutex_unlock(&to_kill->LOCK_thd_data); mysql_mutex_lock(&to_kill->LOCK_wakeup_ready); to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; mysql_cond_broadcast(&to_kill->COND_wakeup_ready); @@ -334,6 +505,30 @@ void slave_background_kill_request(THD *to_kill) mysql_manager_submit(bg_slave_kill, to_kill); } +/* + This function must only be called from a slave SQL thread (or worker thread), + to ensure that the table_entry will not go away before we can lock the + LOCK_slave_state. +*/ +void +slave_background_gtid_pos_create_request( + rpl_slave_state::gtid_pos_table *table_entry) +{ + if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) + return; + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + return; + } + table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED; + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + + mysql_manager_submit(bg_gtid_pos_auto_create, table_entry->table_hton); +} + + /* Initialize slave structures */ int init_slave() @@ -391,15 +586,6 @@ int init_slave() } /* - If --slave-skip-errors=... was not used, the string value for the - system variable has not been set up yet. Do it now. - */ - if (!use_slave_mask) - { - print_slave_skip_errors(); - } - - /* If master_host is not specified, try to read it from the master_info file. If master_host is specified, create the master_info file if it doesn't exists. @@ -431,7 +617,7 @@ int init_slave() thd->reset_globals(); delete thd; - if (error) + if (unlikely(error)) { sql_print_error("Failed to create slave threads"); goto err; @@ -496,12 +682,12 @@ int init_recovery(Master_info* mi, const char** errmsg) DBUG_RETURN(0); } - + /** Convert slave skip errors bitmap into a printable string. */ -static void print_slave_skip_errors(void) +static void make_slave_skip_errors_printable(void) { /* To be safe, we want 10 characters of room in the buffer for a number @@ -510,7 +696,7 @@ static void print_slave_skip_errors(void) plus a NUL terminator. That is a max 6 digit number. */ const size_t MIN_ROOM= 10; - DBUG_ENTER("print_slave_skip_errors"); + DBUG_ENTER("make_slave_skip_errors_printable"); DBUG_ASSERT(sizeof(slave_skip_error_names) > MIN_ROOM); DBUG_ASSERT(MAX_SLAVE_ERROR <= 999999); // 6 digits @@ -532,14 +718,14 @@ static void print_slave_skip_errors(void) else { char *buff= slave_skip_error_names; - char *bend= buff + sizeof(slave_skip_error_names); + char *bend= buff + sizeof(slave_skip_error_names) - MIN_ROOM; int errnum; for (errnum= 0; errnum < MAX_SLAVE_ERROR; errnum++) { if (bitmap_is_set(&slave_error_mask, errnum)) { - if (buff + MIN_ROOM >= bend) + if (buff >= bend) break; /* purecov: tested */ buff= int10_to_str(errnum, buff, 10); *buff++= ','; @@ -569,24 +755,24 @@ static void print_slave_skip_errors(void) Called from get_options() in mysqld.cc on start-up */ -void init_slave_skip_errors(const char* arg) +bool init_slave_skip_errors(const char* arg) { const char *p; DBUG_ENTER("init_slave_skip_errors"); - if (my_bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0)) - { - fprintf(stderr, "Badly out of memory, please check your system status\n"); - exit(1); - } - use_slave_mask = 1; + if (!arg || !*arg) // No errors defined + goto end; + + if (unlikely(my_bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))) + DBUG_RETURN(1); + + use_slave_mask= 1; for (;my_isspace(system_charset_info,*arg);++arg) /* empty */; if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4)) { bitmap_set_all(&slave_error_mask); - print_slave_skip_errors(); - DBUG_VOID_RETURN; + goto end; } for (p= arg ; *p; ) { @@ -598,11 +784,109 @@ void init_slave_skip_errors(const char* arg) while (!my_isdigit(system_charset_info,*p) && *p) p++; } - /* Convert slave skip errors bitmap into a printable string. */ - print_slave_skip_errors(); + +end: + make_slave_skip_errors_printable(); + DBUG_RETURN(0); +} + +/** + Make printable version if slave_transaction_retry_errors + This is never empty as at least ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT + will be there +*/ + +static void make_slave_transaction_retry_errors_printable(void) +{ + /* + To be safe, we want 10 characters of room in the buffer for a number + plus terminators. Also, we need some space for constant strings. + 10 characters must be sufficient for a number plus {',' | '...'} + plus a NUL terminator. That is a max 6 digit number. + */ + const size_t MIN_ROOM= 10; + char *buff= slave_transaction_retry_error_names; + char *bend= buff + sizeof(slave_transaction_retry_error_names) - MIN_ROOM; + uint i; + DBUG_ENTER("make_slave_transaction_retry_errors_printable"); + DBUG_ASSERT(sizeof(slave_transaction_retry_error_names) > MIN_ROOM); + + /* Make @@slave_transaction_retry_errors show a human-readable value */ + opt_slave_transaction_retry_errors= slave_transaction_retry_error_names; + + for (i= 0; i < slave_transaction_retry_error_length && buff < bend; i++) + { + buff= int10_to_str(slave_transaction_retry_errors[i], buff, 10); + *buff++= ','; + } + if (buff != slave_transaction_retry_error_names) + buff--; // Remove last ',' + if (i < slave_transaction_retry_error_length) + { + /* Couldn't show all errors */ + buff= strmov(buff, "..."); /* purecov: tested */ + } + *buff=0; + DBUG_PRINT("exit", ("error_names: '%s'", + slave_transaction_retry_error_names)); DBUG_VOID_RETURN; } + +bool init_slave_transaction_retry_errors(const char* arg) +{ + const char *p; + long err_code; + uint i; + DBUG_ENTER("init_slave_transaction_retry_errors"); + + /* Handle empty strings */ + if (!arg) + arg= ""; + + slave_transaction_retry_error_length= 2; + for (;my_isspace(system_charset_info,*arg);++arg) + /* empty */; + for (p= arg; *p; ) + { + if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code))) + break; + slave_transaction_retry_error_length++; + while (!my_isdigit(system_charset_info,*p) && *p) + p++; + } + + if (unlikely(!(slave_transaction_retry_errors= + (uint *) my_once_alloc(sizeof(int) * + slave_transaction_retry_error_length, + MYF(MY_WME))))) + DBUG_RETURN(1); + + /* + Temporary error codes: + currently, InnoDB deadlock detected by InnoDB or lock + wait timeout (innodb_lock_wait_timeout exceeded + */ + slave_transaction_retry_errors[0]= ER_LOCK_DEADLOCK; + slave_transaction_retry_errors[1]= ER_LOCK_WAIT_TIMEOUT; + + /* Add user codes after this */ + for (p= arg, i= 2; *p; ) + { + if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code))) + break; + if (err_code > 0 && err_code < ER_ERROR_LAST) + slave_transaction_retry_errors[i++]= (uint) err_code; + while (!my_isdigit(system_charset_info,*p) && *p) + p++; + } + slave_transaction_retry_error_length= i; + + make_slave_transaction_retry_errors_printable(); + DBUG_RETURN(0); +} + + int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) { DBUG_ENTER("terminate_slave_threads"); @@ -624,11 +908,12 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) } else mi->rli.abort_slave=1; - if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock, - &mi->rli.stop_cond, - &mi->rli.slave_running, - skip_lock)) && - !force_all) + if (unlikely((error= terminate_slave_thread(mi->rli.sql_driver_thd, + sql_lock, + &mi->rli.stop_cond, + &mi->rli.slave_running, + skip_lock))) && + !force_all) DBUG_RETURN(error); retval= error; @@ -646,11 +931,11 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) { DBUG_PRINT("info",("Terminating IO thread")); mi->abort_slave=1; - if ((error=terminate_slave_thread(mi->io_thd, io_lock, - &mi->stop_cond, - &mi->slave_running, - skip_lock)) && - !force_all) + if (unlikely((error= terminate_slave_thread(mi->io_thd, io_lock, + &mi->stop_cond, + &mi->slave_running, + skip_lock))) && + !force_all) DBUG_RETURN(error); if (!retval) retval= error; @@ -671,7 +956,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) mysql_mutex_unlock(log_lock); } - DBUG_RETURN(retval); + DBUG_RETURN(retval); } @@ -754,7 +1039,7 @@ terminate_slave_thread(THD *thd, int error __attribute__((unused)); DBUG_PRINT("loop", ("killing slave thread")); - mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_kill); #ifndef DONT_USE_THR_ALARM /* Error codes from pthread_kill are: @@ -764,9 +1049,9 @@ terminate_slave_thread(THD *thd, int err __attribute__((unused))= pthread_kill(thd->real_id, thr_client_alarm); DBUG_ASSERT(err != EINVAL); #endif - thd->awake(NOT_KILLED); + thd->awake_no_mutex(NOT_KILLED); - mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); /* There is a small chance that slave thread might miss the first @@ -826,8 +1111,9 @@ int start_slave_thread( } start_id= *slave_run_id; DBUG_PRINT("info",("Creating new slave thread")); - if ((error = mysql_thread_create(thread_key, - &th, &connection_attrib, h_func, (void*)mi))) + if (unlikely((error= mysql_thread_create(thread_key, + &th, &connection_attrib, h_func, + (void*)mi)))) { sql_print_error("Can't create slave thread (errno= %d).", error); if (start_lock) @@ -940,7 +1226,7 @@ int start_slave_threads(THD *thd, mi->rli.restart_gtid_pos.reset(); } - if (!error && (thread_mask & SLAVE_IO)) + if (likely(!error) && likely((thread_mask & SLAVE_IO))) error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE key_thread_slave_io, @@ -949,7 +1235,7 @@ int start_slave_threads(THD *thd, cond_io, &mi->slave_running, &mi->slave_run_id, mi); - if (!error && (thread_mask & SLAVE_SQL)) + if (likely(!error) && likely(thread_mask & SLAVE_SQL)) { error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE @@ -959,7 +1245,7 @@ int start_slave_threads(THD *thd, cond_sql, &mi->rli.slave_running, &mi->rli.slave_run_id, mi); - if (error) + if (unlikely(error)) terminate_slave_threads(mi, thread_mask & SLAVE_IO, !need_slave_mutex); } DBUG_RETURN(error); @@ -1184,7 +1470,7 @@ const char *print_slave_db_safe(const char* db) int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, const char *default_val) { - uint length; + size_t length; DBUG_ENTER("init_strvar_from_file"); if ((length=my_b_gets(f,var, max_size))) @@ -1794,7 +2080,7 @@ when it try to get the value of TIME_ZONE global variable from master."; if (++dbug_count < 3) goto heartbeat_network_error; }); - if (mysql_real_query(mysql, query, strlen(query))) + if (mysql_real_query(mysql, query, (ulong)strlen(query))) { if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; @@ -1842,7 +2128,7 @@ when it try to get the value of TIME_ZONE global variable from master."; Once the first FD will be received its alg descriptor will replace the being queried one. */ - rc= mysql_real_query(mysql, query, strlen(query)); + rc= mysql_real_query(mysql, query,(ulong)strlen(query)); if (rc != 0) { if (check_io_slave_killed(mi, NULL)) @@ -1931,7 +2217,8 @@ past_checksum: */ if (opt_replicate_events_marked_for_skip == RPL_SKIP_FILTER_ON_MASTER) { - if (mysql_real_query(mysql, STRING_WITH_LEN("SET skip_replication=1"))) + if (unlikely(mysql_real_query(mysql, + STRING_WITH_LEN("SET skip_replication=1")))) { err_code= mysql_errno(mysql); if (is_network_error(err_code)) @@ -1975,7 +2262,7 @@ past_checksum: STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_ANNOTATE))), mysql_real_query(mysql, STRING_WITH_LEN("SET @mariadb_slave_capability=" STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_MINE)))); - if (rc) + if (unlikely(rc)) { err_code= mysql_errno(mysql); if (is_network_error(err_code)) @@ -2051,7 +2338,7 @@ after_set_capability: query_str.append(STRING_WITH_LEN("'"), system_charset_info); rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); - if (rc) + if (unlikely(rc)) { err_code= mysql_errno(mysql); if (is_network_error(err_code)) @@ -2084,7 +2371,7 @@ after_set_capability: } rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); - if (rc) + if (unlikely(rc)) { err_code= mysql_errno(mysql); if (is_network_error(err_code)) @@ -2117,7 +2404,7 @@ after_set_capability: } rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); - if (rc) + if (unlikely(rc)) { err_code= mysql_errno(mysql); if (is_network_error(err_code)) @@ -2153,7 +2440,7 @@ after_set_capability: query_str.append(STRING_WITH_LEN("'"), system_charset_info); rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); - if (rc) + if (unlikely(rc)) { err_code= mysql_errno(mysql); if (is_network_error(err_code)) @@ -2423,7 +2710,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, bool *suppress_warnings) { uchar buf[1024], *pos= buf; - uint report_host_len=0, report_user_len=0, report_password_len=0; + size_t report_host_len=0, report_user_len=0, report_password_len=0; DBUG_ENTER("register_slave_on_master"); *suppress_warnings= FALSE; @@ -2431,7 +2718,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, report_host_len= strlen(report_host); if (report_host_len > HOSTNAME_LENGTH) { - sql_print_warning("The length of report_host is %d. " + sql_print_warning("The length of report_host is %zu. " "It is larger than the max length(%d), so this " "slave cannot be registered to the master.", report_host_len, HOSTNAME_LENGTH); @@ -2442,7 +2729,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, report_user_len= strlen(report_user); if (report_user_len > USERNAME_LENGTH) { - sql_print_warning("The length of report_user is %d. " + sql_print_warning("The length of report_user is %zu. " "It is larger than the max length(%d), so this " "slave cannot be registered to the master.", report_user_len, USERNAME_LENGTH); @@ -2453,7 +2740,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, report_password_len= strlen(report_password); if (report_password_len > MAX_PASSWORD_LENGTH) { - sql_print_warning("The length of report_password is %d. " + sql_print_warning("The length of report_password is %zu. " "It is larger than the max length(%d), so this " "slave cannot be registered to the master.", report_password_len, MAX_PASSWORD_LENGTH); @@ -2474,7 +2761,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, /* The master will fill in master_id */ int4store(pos, 0); pos+= 4; - if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0)) + if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (ulong) (pos- buf), 0)) { if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) { @@ -2718,6 +3005,19 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list, field_list->push_back(new (mem_root) Item_empty_string(thd, "Slave_SQL_Running_State", 20)); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Slave_DDL_Groups", 20, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Slave_Non_Transactional_Groups", 20, + MYSQL_TYPE_LONGLONG), + mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "Slave_Transactional_Groups", 20, + MYSQL_TYPE_LONGLONG), + mem_root); + if (full) { field_list->push_back(new (mem_root) @@ -2741,7 +3041,7 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list, mem_root); field_list->push_back(new (mem_root) Item_empty_string(thd, "Gtid_Slave_Pos", - gtid_pos_length), + (uint)gtid_pos_length), mem_root); } DBUG_VOID_RETURN; @@ -2951,6 +3251,17 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, // Slave_SQL_Running_State protocol->store(slave_sql_running_state, &my_charset_bin); + uint64 events; + events= (uint64)my_atomic_load64_explicit((volatile int64 *) + &mi->total_ddl_groups, MY_MEMORY_ORDER_RELAXED); + protocol->store(events); + events= (uint64)my_atomic_load64_explicit((volatile int64 *) + &mi->total_non_trans_groups, MY_MEMORY_ORDER_RELAXED); + protocol->store(events); + events= (uint64)my_atomic_load64_explicit((volatile int64 *) + &mi->total_trans_groups, MY_MEMORY_ORDER_RELAXED); + protocol->store(events); + if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3068,6 +3379,13 @@ void set_slave_thread_options(THD* thd) options&= ~OPTION_BIN_LOG; thd->variables.option_bits= options; thd->variables.completion_type= 0; + + /* For easier test in LOGGER::log_command */ + if (thd->variables.log_disabled_statements & LOG_DISABLE_SLAVE) + thd->variables.option_bits|= OPTION_LOG_OFF; + + thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements & + LOG_SLOW_DISABLE_SLAVE); DBUG_VOID_RETURN; } @@ -3114,8 +3432,7 @@ static int init_slave_thread(THD* thd, Master_info *mi, thd->security_ctx->skip_grants(); thd->slave_thread= 1; thd->connection_name= mi->connection_name; - thd->variables.sql_log_slow= opt_log_slow_slave_statements; - thd->variables.log_slow_filter= global_system_variables.log_slow_filter; + thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE); set_slave_thread_options(thd); if (thd_type == SLAVE_THD_SQL) @@ -3180,11 +3497,9 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, if (opt_log_slave_updates && opt_replicate_annotate_row_events) binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; - if (RUN_HOOK(binlog_relay_io, - before_request_transmit, - (thd, mi, binlog_flags))) + if (repl_semisync_slave.request_transmit(mi)) DBUG_RETURN(1); - + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -3247,7 +3562,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, #endif len = cli_safe_read_reallen(mysql, network_read_len); - if (len == packet_error || (long) len < 1) + if (unlikely(len == packet_error || (long) len < 1)) { if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) { @@ -3256,7 +3571,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, we suppress prints to .err file as long as the reconnect happens without problems */ - *suppress_warnings= TRUE; + *suppress_warnings= + global_system_variables.log_warnings < 2 ? TRUE : FALSE; } else { @@ -3283,14 +3599,20 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, DBUG_RETURN(len - 1); } -/* + +/** Check if the current error is of temporary nature of not. Some errors are temporary in nature, such as ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. + + @retval 0 if fatal error + @retval 1 temporary error, do retry */ + int has_temporary_error(THD *thd) { + uint current_errno; DBUG_ENTER("has_temporary_error"); DBUG_EXECUTE_IF("all_errors_are_temporary_errors", @@ -3305,17 +3627,15 @@ has_temporary_error(THD *thd) error or not. This is currently the case for Incident_log_event, which sets no message. Return FALSE. */ - if (!thd->is_error()) + if (!likely(thd->is_error())) DBUG_RETURN(0); - /* - Temporary error codes: - currently, InnoDB deadlock detected by InnoDB or lock - wait timeout (innodb_lock_wait_timeout exceeded - */ - if (thd->get_stmt_da()->sql_errno() == ER_LOCK_DEADLOCK || - thd->get_stmt_da()->sql_errno() == ER_LOCK_WAIT_TIMEOUT) - DBUG_RETURN(1); + current_errno= thd->get_stmt_da()->sql_errno(); + for (uint i= 0; i < slave_transaction_retry_error_length; i++) + { + if (current_errno == slave_transaction_retry_errors[i]) + DBUG_RETURN(1); + } DBUG_RETURN(0); } @@ -3542,7 +3862,7 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi, TODO: Replace this with a decent error message when merged with BUG#24954 (which adds several new error message). */ - if (error) + if (unlikely(error)) { rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, rgi->gtid_info(), "It was not possible to update the positions" @@ -3647,10 +3967,7 @@ int apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd, rpl_group_info *rgi) { -#ifndef DBUG_OFF - Relay_log_info* rli= rgi->rli; -#endif - mysql_mutex_assert_not_owner(&rli->data_lock); + mysql_mutex_assert_not_owner(&rgi->rli->data_lock); int reason= apply_event_and_update_pos_setup(ev, thd, rgi); /* In parallel replication, sql_slave_skip_counter is handled in the SQL @@ -3945,7 +4262,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, update_log_pos failed: this should not happen, so we don't retry. */ - if (exec_res == 2) + if (unlikely(exec_res == 2)) DBUG_RETURN(1); #ifdef WITH_WSREP @@ -3957,7 +4274,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, if (slave_trans_retries) { int UNINIT_VAR(temp_err); - if (exec_res && (temp_err= has_temporary_error(thd))) + if (unlikely(exec_res) && (temp_err= has_temporary_error(thd))) { const char *errmsg; rli->clear_error(); @@ -3992,8 +4309,9 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, exec_res= 0; serial_rgi->cleanup_context(thd, 1); /* chance for concurrent connection to get more locks */ - slave_sleep(thd, MY_MIN(serial_rgi->trans_retries, + slave_sleep(thd, MY_MAX(MY_MIN(serial_rgi->trans_retries, MAX_SLAVE_RETRY_PAUSE), + slave_trans_retry_interval), sql_slave_killed, serial_rgi); serial_rgi->trans_retries++; mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS @@ -4087,7 +4405,7 @@ static bool check_io_slave_killed(Master_info *mi, const char *info) @param[in] mysql MySQL connection. @param[in] mi Master connection information. @param[in,out] retry_count Number of attempts to reconnect. - @param[in] suppress_warnings TRUE when a normal net read timeout + @param[in] suppress_warnings TRUE when a normal net read timeout has caused to reconnecting. @param[in] messages Messages to print/log, see reconnect_messages[] array. @@ -4216,6 +4534,7 @@ pthread_handler_t handle_slave_io(void *arg) mi->abort_slave = 0; mysql_mutex_unlock(&mi->run_lock); mysql_cond_broadcast(&mi->start_cond); + mi->rows_event_tracker.reset(); DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu", mi->master_log_name, mi->master_log_pos)); @@ -4240,7 +4559,8 @@ pthread_handler_t handle_slave_io(void *arg) } thd->variables.wsrep_on= 0; - if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) + if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) + || repl_semisync_slave.slave_start(mi)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4298,6 +4618,10 @@ connected: */ mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid; mi->gtid_event_seen= false; + /* + Reset stale state of the rows-event group tracker at reconnect. + */ + mi->rows_event_tracker.reset(); } #ifdef ENABLED_DEBUG_SYNC @@ -4397,7 +4721,7 @@ connected: if (check_io_slave_killed(mi, NullS)) goto err; - if (event_len == packet_error) + if (unlikely(event_len == packet_error)) { uint mysql_error_number= mysql_errno(mysql); switch (mysql_error_number) { @@ -4431,9 +4755,10 @@ Stopping slave I/O thread due to out-of-memory error from master"); retry_count=0; // ok event, reset retry counter THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); event_buf= (const char*)mysql->net.read_pos + 1; - if (RUN_HOOK(binlog_relay_io, after_read_event, - (thd, mi,(const char*)mysql->net.read_pos + 1, - event_len, &event_buf, &event_len))) + mi->semi_ack= 0; + if (repl_semisync_slave. + slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len, + &(mi->semi_ack), &event_buf, &event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), @@ -4482,9 +4807,6 @@ Stopping slave I/O thread due to out-of-memory error from master"); tokenamount -= network_read_len; } - /* XXX: 'synced' should be updated by queue_event to indicate - whether event has been synced to disk */ - bool synced= 0; if (queue_event(mi, event_buf, event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, @@ -4493,17 +4815,27 @@ Stopping slave I/O thread due to out-of-memory error from master"); goto err; } - if (RUN_HOOK(binlog_relay_io, after_queue_event, - (thd, mi, event_buf, event_len, synced))) + if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK)) { - mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, - ER_THD(thd, ER_SLAVE_FATAL_ERROR), - "Failed to run 'after_queue_event' hook"); - goto err; + /* + We deliberately ignore the error in slave_reply, such error should + not cause the slave IO thread to stop, and the error messages are + already reported. + */ + (void)repl_semisync_slave.slave_reply(mi); } if (mi->using_gtid == Master_info::USE_GTID_NO && - flush_master_info(mi, TRUE, TRUE)) + /* + If rpl_semi_sync_slave_delay_master is enabled, we will flush + master info only when ack is needed. This may lead to at least one + group transaction delay but affords better performance improvement. + */ + (!repl_semisync_slave.get_slave_enabled() || + (!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) || + (mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) && + (DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) || + flush_master_info(mi, TRUE, TRUE))) { sql_print_error("Failed to flush master info file"); goto err; @@ -4563,9 +4895,9 @@ err: tmp.c_ptr_safe()); sql_print_information("master was %s:%d", mi->host, mi->port); } - RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); + repl_semisync_slave.slave_stop(mi); thd->reset_query(); - thd->reset_db(NULL, 0); + thd->reset_db(&null_clex_str); if (mysql) { /* @@ -4597,9 +4929,7 @@ err_during_init: // TODO: make rpl_status part of Master_info change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); - mysql_mutex_lock(&LOCK_thread_count); - thd->unlink(); - mysql_mutex_unlock(&LOCK_thread_count); + thd->assert_not_linked(); delete thd; thread_safe_decrement32(&service_thread_count); signal_thd_deleted(); @@ -4701,7 +5031,7 @@ slave_output_error_info(rpl_group_info *rgi, THD *thd) Relay_log_info *rli= rgi->rli; uint32 const last_errno= rli->last_error().number; - if (thd->is_error()) + if (unlikely(thd->is_error())) { char const *const errmsg= thd->get_stmt_da()->message(); @@ -4745,7 +5075,7 @@ slave_output_error_info(rpl_group_info *rgi, THD *thd) udf_error = true; sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); } - if (udf_error) + if (unlikely(udf_error)) { StringBuffer<100> tmp; if (rli->mi->using_gtid != Master_info::USE_GTID_NO) @@ -4873,6 +5203,10 @@ pthread_handler_t handle_slave_sql(void *arg) applied. In all other cases it must be FALSE. */ thd->variables.binlog_annotate_row_events= 0; + + /* Ensure that slave can exeute any alter table it gets from master */ + thd->variables.alter_algorithm= (ulong) Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT; + add_to_active_threads(thd); /* We are going to set slave_running to 1. Assuming slave I/O thread is @@ -5013,12 +5347,20 @@ pthread_handler_t handle_slave_sql(void *arg) if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode) goto err; } + /* Re-load the set of mysql.gtid_slave_posXXX tables available. */ + if (find_gtid_slave_pos_tables(thd)) + { + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Error processing replication GTID position tables: %s", + thd->get_stmt_da()->message()); + goto err; + } /* execute init_slave variable */ if (opt_init_slave.length) { execute_init_command(thd, &opt_init_slave, &LOCK_sys_init_slave); - if (thd->is_slave_error) + if (unlikely(thd->is_slave_error)) { rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, "Slave SQL thread aborted. Can't execute init_slave query"); @@ -5170,7 +5512,7 @@ pthread_handler_t handle_slave_sql(void *arg) */ thd->catalog= 0; thd->reset_query(); - thd->reset_db(NULL, 0); + thd->reset_db(&null_clex_str); if (rli->mi->using_gtid != Master_info::USE_GTID_NO) { ulong domain_count; @@ -5295,11 +5637,7 @@ err_during_init: rpl_parallel_resize_pool_if_no_slaves(); - /* TODO: Check if this lock is needed */ - mysql_mutex_lock(&LOCK_thread_count); delete serial_rgi; - mysql_mutex_unlock(&LOCK_thread_count); - delete thd; thread_safe_decrement32(&service_thread_count); signal_thd_deleted(); @@ -5712,7 +6050,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) { int error= 0; StringBuffer<1024> error_msg; - ulonglong inc_pos; + ulonglong inc_pos= 0; ulonglong event_pos; Relay_log_info *rli= &mi->rli; mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); @@ -5726,7 +6064,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) char* new_buf = NULL; char new_buf_arr[4096]; bool is_malloc = false; - + bool is_rows_event= false; /* FD_q must have been prepared for the first R_a event inside get_master_version_and_clock() @@ -5811,6 +6149,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_ASSERT(debug_sync_service); DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); + dbug_rows_event_count = 0; };); #endif mysql_mutex_lock(&mi->data_lock); @@ -5908,7 +6247,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_unlock(log_lock); goto err; } - rli->relay_log.signal_update(); + rli->relay_log.signal_relay_log_update(); mysql_mutex_unlock(log_lock); mi->gtid_reconnect_event_skip_count= 0; @@ -6160,11 +6499,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) got_gtid_event= true; if (mi->using_gtid == Master_info::USE_GTID_NO) goto default_action; - if (unlikely(!mi->gtid_event_seen)) + if (unlikely(mi->gtid_reconnect_event_skip_count)) { - mi->gtid_event_seen= true; - if (mi->gtid_reconnect_event_skip_count) + if (likely(!mi->gtid_event_seen)) { + mi->gtid_event_seen= true; /* If we are reconnecting, and we need to skip a partial event group already queued to the relay log before the reconnect, then we check @@ -6193,13 +6532,45 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first); goto err; } + if (global_system_variables.log_warnings > 1) + { + bool first= true; + StringBuffer<1024> gtid_text; + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + sql_print_information("Slave IO thread is reconnected to " + "receive Gtid_log_event %s. It is to skip %llu " + "already received events including the gtid one", + gtid_text.ptr(), + mi->events_queued_since_last_gtid); + } + goto default_action; } - } + else + { + bool first; + StringBuffer<1024> gtid_text; - if (unlikely(mi->gtid_reconnect_event_skip_count)) - { - goto default_action; + gtid_text.append(STRING_WITH_LEN("Last received gtid: ")); + first= true; + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + gtid_text.append(STRING_WITH_LEN(", currently received: ")); + first= true; + rpl_slave_state_tostring_helper(>id_text, &event_gtid, &first); + + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + sql_print_error("Slave IO thread has received a new Gtid_log_event " + "while skipping already logged events " + "after reconnect. %s. %llu remains to be skipped. " + "The number of originally read events was %llu", + gtid_text.ptr(), + mi->gtid_reconnect_event_skip_count, + mi->events_queued_since_last_gtid); + goto err; + } } + mi->gtid_event_seen= true; /* We have successfully queued to relay log everything before this GTID, so @@ -6266,8 +6637,34 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) goto err; } } - buf = new_buf; is_compress_event = true; + buf = new_buf; + /* + As we are uncertain about compressed V2 rows events, we don't track + them + */ + if (LOG_EVENT_IS_ROW_V2((Log_event_type) buf[EVENT_TYPE_OFFSET])) + goto default_action; + /* fall through */ + case WRITE_ROWS_EVENT_V1: + case UPDATE_ROWS_EVENT_V1: + case DELETE_ROWS_EVENT_V1: + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + { + is_rows_event= true; + mi->rows_event_tracker.update(mi->master_log_name, + mi->master_log_pos, + buf, + mi->rli.relay_log. + description_event_for_queue); + + DBUG_EXECUTE_IF("simulate_stmt_end_rows_event_loss", + { + mi->rows_event_tracker.stmt_end_seen= false; + }); + } goto default_action; #ifndef DBUG_OFF @@ -6337,6 +6734,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } /* + Integrity of Rows- event group check. + A sequence of Rows- events must end with STMT_END_F flagged one. + Even when Heartbeat event interrupts Rows- events flow this must indicate a + malfunction e.g logging on the master. + */ + if (((uchar) buf[EVENT_TYPE_OFFSET] != HEARTBEAT_LOG_EVENT) && + !is_rows_event && + mi->rows_event_tracker.check_and_report(mi->master_log_name, + mi->master_log_pos)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + + /* If we filter events master-side (eg. @@skip_replication), we will see holes in the event positions from the master. If we see such a hole, adjust mi->master_log_pos accordingly so we maintain the correct position (for @@ -6464,7 +6876,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) if (got_gtid_event) rli->ign_gtids.update(&event_gtid); } - rli->relay_log.signal_update(); // the slave SQL thread needs to re-check + // the slave SQL thread needs to re-check + rli->relay_log.signal_relay_log_update(); DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", (ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET))); } @@ -6488,7 +6901,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } mysql_mutex_unlock(log_lock); - if (!error && + if (likely(!error) && mi->using_gtid != Master_info::USE_GTID_NO && mi->events_queued_since_last_gtid > 0 && ( (mi->last_queued_gtid_standalone && @@ -6504,6 +6917,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) The whole of the current event group is queued. So in case of reconnect we can start from after the current GTID. */ + if (mi->gtid_reconnect_event_skip_count) + { + bool first= true; + StringBuffer<1024> gtid_text; + + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + sql_print_error("Slave IO thread received a terminal event from " + "group %s whose retrieval was interrupted " + "with reconnect. We still had %llu events to read. " + "The number of originally read events was %llu", + gtid_text.ptr(), + mi->gtid_reconnect_event_skip_count, + mi->events_queued_since_last_gtid); + } mi->gtid_current_pos.update(&mi->last_queued_gtid); mi->events_queued_since_last_gtid= 0; @@ -6522,11 +6950,11 @@ err: Do not print ER_SLAVE_RELAY_LOG_WRITE_FAILURE error here, as the caller handle_slave_io() prints it on return. */ - if (error && error != ER_SLAVE_RELAY_LOG_WRITE_FAILURE) + if (unlikely(error) && error != ER_SLAVE_RELAY_LOG_WRITE_FAILURE) mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error), error_msg.ptr()); - if(is_malloc) + if (unlikely(is_malloc)) my_free((void *)new_buf); DBUG_RETURN(error); @@ -6974,7 +7402,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) MYSQL_BIN_LOG::open() will write the buffered description event. */ old_pos= rli->event_relay_log_pos; - if ((ev= Log_event::read_log_event(cur_log,0, + if ((ev= Log_event::read_log_event(cur_log, rli->relay_log.description_event_for_exec, opt_slave_sql_verify_checksum))) @@ -6993,7 +7421,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) } if (opt_reckless_slave) // For mysql-test cur_log->error = 0; - if (cur_log->error < 0) + if (unlikely(cur_log->error < 0)) { errmsg = "slave SQL thread aborted because of I/O error"; if (hot_log) @@ -7538,6 +7966,92 @@ bool rpl_master_erroneous_autoinc(THD *thd) return FALSE; } + +static bool get_row_event_stmt_end(const char* buf, + const Format_description_log_event *fdle) +{ + uint8 const common_header_len= fdle->common_header_len; + Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]; + + uint8 const post_header_len= fdle->post_header_len[event_type-1]; + const char *flag_start= buf + common_header_len; + /* + The term 4 below signifies that master is of 'an intermediate source', see + Rows_log_event::Rows_log_event. + */ + flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET); + + return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0; +} + + +/* + Reset log event tracking data. +*/ + +void Rows_event_tracker::reset() +{ + binlog_file_name[0]= 0; + first_seen= last_seen= 0; + stmt_end_seen= false; +} + + +/* + Update log event tracking data. + + The first- and last- seen event binlog position get memorized, as + well as the end-of-statement status of the last one. +*/ + +void Rows_event_tracker::update(const char* file_name, my_off_t pos, + const char* buf, + const Format_description_log_event *fdle) +{ + if (!first_seen) + { + first_seen= pos; + strmake(binlog_file_name, file_name, sizeof(binlog_file_name) - 1); + } + last_seen= pos; + DBUG_ASSERT(stmt_end_seen == 0); // We can only have one + stmt_end_seen= get_row_event_stmt_end(buf, fdle); +}; + + +/** + The function is called at next event reading + after a sequence of Rows- log-events. It checks the end-of-statement status + of the past sequence to report on any isssue. + In the positive case the tracker gets reset. + + @return true when the Rows- event group integrity found compromised, + false otherwise. +*/ +bool Rows_event_tracker::check_and_report(const char* file_name, + my_off_t pos) +{ + if (last_seen) + { + // there was at least one "block" event previously + if (!stmt_end_seen) + { + sql_print_error("Slave IO thread did not receive an expected " + "Rows-log end-of-statement for event starting " + "at log '%s' position %llu " + "whose last block was seen at log '%s' position %llu. " + "The end-of-statement should have been delivered " + "before the current one at log '%s' position %llu", + binlog_file_name, first_seen, + binlog_file_name, last_seen, file_name, pos); + return true; + } + reset(); + } + + return false; +} + /** @} (end of group Replication) */ |