summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc800
1 files changed, 657 insertions, 143 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 2ff1a0490e9..f64ab0d015f 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -25,7 +25,7 @@
replication slave.
*/
-#include <my_global.h>
+#include "mariadb.h"
#include "sql_priv.h"
#include "slave.h"
#include "sql_parse.h" // execute_init_command
@@ -43,7 +43,6 @@
#include <ssl_compat.h>
#include "unireg.h"
#include <mysys_err.h>
-#include "rpl_handler.h"
#include <signal.h>
#include <mysql.h>
#include <myisam.h>
@@ -60,6 +59,8 @@
#include "rpl_tblmap.h"
#include "debug_sync.h"
#include "rpl_parallel.h"
+#include "sql_show.h"
+#include "semisync_slave.h"
#include "sql_manager.h"
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -72,6 +73,9 @@
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;
char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
+uint *slave_transaction_retry_errors;
+uint slave_transaction_retry_error_length= 0;
+char slave_transaction_retry_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
char* slave_load_tmpdir = 0;
Master_info *active_mi= 0;
@@ -83,7 +87,7 @@ ulonglong opt_read_binlog_speed_limit = 0;
const char *relay_log_index= 0;
const char *relay_log_basename= 0;
-LEX_STRING default_master_connection_name= { (char*) "", 0 };
+LEX_CSTRING default_master_connection_name= { (char*) "", 0 };
/*
When slave thread exits, we need to remember the temporary tables so we
@@ -156,7 +160,8 @@ static bool wait_for_relay_log_space(Relay_log_info* rli);
static bool io_slave_killed(Master_info* mi);
static bool sql_slave_killed(rpl_group_info *rgi);
static int init_slave_thread(THD*, Master_info *, SLAVE_THD_TYPE);
-static void print_slave_skip_errors(void);
+static void make_slave_skip_errors_printable(void);
+static void make_slave_transaction_retry_errors_printable(void);
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
static int safe_reconnect(THD*, MYSQL*, Master_info*, bool);
static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool);
@@ -280,13 +285,181 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */
-static bool slave_background_thread_gtid_loaded;
+/*
+ Note: This definition needs to be kept in sync with the one in
+ mysql_system_tables.sql which is used by mysql_create_db.
+*/
+static const char gtid_pos_table_definition1[]=
+ "CREATE TABLE ";
+static const char gtid_pos_table_definition2[]=
+ " (domain_id INT UNSIGNED NOT NULL, "
+ "sub_id BIGINT UNSIGNED NOT NULL, "
+ "server_id INT UNSIGNED NOT NULL, "
+ "seq_no BIGINT UNSIGNED NOT NULL, "
+ "PRIMARY KEY (domain_id, sub_id)) CHARSET=latin1 "
+ "COMMENT='Replication slave GTID position' "
+ "ENGINE=";
+
+/*
+ Build a query string
+ CREATE TABLE mysql.gtid_slave_pos_<engine> ... ENGINE=<engine>
+*/
+static bool
+build_gtid_pos_create_query(THD *thd, String *query,
+ LEX_CSTRING *table_name,
+ LEX_CSTRING *engine_name)
+{
+ bool err= false;
+ err|= query->append(gtid_pos_table_definition1);
+ err|= append_identifier(thd, query, table_name);
+ err|= query->append(gtid_pos_table_definition2);
+ err|= append_identifier(thd, query, engine_name);
+ return err;
+}
+
+
+static int
+gtid_pos_table_creation(THD *thd, plugin_ref engine, LEX_CSTRING *table_name)
+{
+ int err;
+ StringBuffer<sizeof(gtid_pos_table_definition1) +
+ sizeof(gtid_pos_table_definition1) +
+ 2*FN_REFLEN> query;
+
+ if (build_gtid_pos_create_query(thd, &query, table_name, plugin_name(engine)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ thd->set_db(&MYSQL_SCHEMA_NAME);
+ thd->clear_error();
+ ulonglong thd_saved_option= thd->variables.option_bits;
+ /* This query shuold not be binlogged. */
+ thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG;
+ thd->set_query_and_id(query.c_ptr(), query.length(), thd->charset(),
+ next_query_id());
+ Parser_state parser_state;
+ err= parser_state.init(thd, thd->query(), thd->query_length());
+ if (err)
+ goto end;
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state,
+ FALSE, FALSE);
+ if (unlikely(thd->is_error()))
+ err= 1;
+ /* The warning is relevant to 10.3 and earlier. */
+ sql_print_warning("The automatically created table '%s' name may not be "
+ "entirely in lowercase. The table name will be converted "
+ "to lowercase to any future upgrade to 10.4.0 and later "
+ "version where it will be auto-created at once "
+ "in lowercase.",
+ table_name->str);
+end:
+ thd->variables.option_bits= thd_saved_option;
+ thd->reset_query();
+ return err;
+}
+
+
+static void bg_gtid_pos_auto_create(void *hton)
+{
+ THD *thd= NULL;
+ int UNINIT_VAR(err);
+ plugin_ref engine= NULL, *auto_engines;
+ rpl_slave_state::gtid_pos_table *entry;
+ StringBuffer<FN_REFLEN> loc_table_name;
+ LEX_CSTRING table_name;
+
+ /*
+ Check that the plugin is still in @@gtid_pos_auto_engines, and lock
+ it.
+ */
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ for (auto_engines= opt_gtid_pos_auto_plugins;
+ auto_engines && *auto_engines;
+ ++auto_engines)
+ {
+ if (plugin_hton(*auto_engines) == hton)
+ {
+ engine= my_plugin_lock(NULL, *auto_engines);
+ break;
+ }
+ }
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ if (!engine)
+ {
+ /* The engine is gone from @@gtid_pos_auto_engines, so no action. */
+ goto end;
+ }
+
+ /* Find the entry for the table to auto-create. */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (entry)
+ {
+ if (entry->table_hton == hton &&
+ entry->state == rpl_slave_state::GTID_POS_CREATE_REQUESTED)
+ break;
+ entry= entry->next;
+ }
+ if (entry)
+ {
+ entry->state = rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS;
+ err= loc_table_name.append(entry->table_name.str, entry->table_name.length);
+ }
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (!entry)
+ goto end;
+ if (err)
+ {
+ sql_print_error("Out of memory while trying to auto-create GTID position table");
+ goto end;
+ }
+ table_name.str= loc_table_name.c_ptr_safe();
+ table_name.length= loc_table_name.length();
+
+ thd= new THD(next_thread_id());
+ thd->thread_stack= (char*) &thd; /* Set approximate stack start */
+ thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
+ thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thd->set_command(COM_DAEMON);
+ thd->variables.wsrep_on= 0;
+ err= gtid_pos_table_creation(thd, engine, &table_name);
+ if (err)
+ {
+ sql_print_error("Error auto-creating GTID position table `mysql.%s`: %s Error_code: %d",
+ table_name.str, thd->get_stmt_da()->message(),
+ thd->get_stmt_da()->sql_errno());
+ thd->clear_error();
+ goto end;
+ }
+
+ /* Now enable the entry for the auto-created table. */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (entry)
+ {
+ if (entry->table_hton == hton &&
+ entry->state == rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS)
+ {
+ entry->state= rpl_slave_state::GTID_POS_AVAILABLE;
+ break;
+ }
+ entry= entry->next;
+ }
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+
+end:
+ delete thd;
+ if (engine)
+ plugin_unlock(NULL, engine);
+}
-struct slave_background_kill_t {
- slave_background_kill_t *next;
- THD *to_kill;
-} *slave_background_kill_list;
+static bool slave_background_thread_gtid_loaded;
static void bg_rpl_load_gtid_slave_state(void *)
{
@@ -317,9 +490,7 @@ static void bg_rpl_load_gtid_slave_state(void *)
static void bg_slave_kill(void *victim)
{
THD *to_kill= (THD *)victim;
- mysql_mutex_lock(&to_kill->LOCK_thd_data);
to_kill->awake(KILL_CONNECTION);
- mysql_mutex_unlock(&to_kill->LOCK_thd_data);
mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
@@ -334,6 +505,30 @@ void slave_background_kill_request(THD *to_kill)
mysql_manager_submit(bg_slave_kill, to_kill);
}
+/*
+ This function must only be called from a slave SQL thread (or worker thread),
+ to ensure that the table_entry will not go away before we can lock the
+ LOCK_slave_state.
+*/
+void
+slave_background_gtid_pos_create_request(
+ rpl_slave_state::gtid_pos_table *table_entry)
+{
+ if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
+ return;
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE)
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ return;
+ }
+ table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+
+ mysql_manager_submit(bg_gtid_pos_auto_create, table_entry->table_hton);
+}
+
+
/* Initialize slave structures */
int init_slave()
@@ -391,15 +586,6 @@ int init_slave()
}
/*
- If --slave-skip-errors=... was not used, the string value for the
- system variable has not been set up yet. Do it now.
- */
- if (!use_slave_mask)
- {
- print_slave_skip_errors();
- }
-
- /*
If master_host is not specified, try to read it from the master_info file.
If master_host is specified, create the master_info file if it doesn't
exists.
@@ -431,7 +617,7 @@ int init_slave()
thd->reset_globals();
delete thd;
- if (error)
+ if (unlikely(error))
{
sql_print_error("Failed to create slave threads");
goto err;
@@ -496,12 +682,12 @@ int init_recovery(Master_info* mi, const char** errmsg)
DBUG_RETURN(0);
}
-
+
/**
Convert slave skip errors bitmap into a printable string.
*/
-static void print_slave_skip_errors(void)
+static void make_slave_skip_errors_printable(void)
{
/*
To be safe, we want 10 characters of room in the buffer for a number
@@ -510,7 +696,7 @@ static void print_slave_skip_errors(void)
plus a NUL terminator. That is a max 6 digit number.
*/
const size_t MIN_ROOM= 10;
- DBUG_ENTER("print_slave_skip_errors");
+ DBUG_ENTER("make_slave_skip_errors_printable");
DBUG_ASSERT(sizeof(slave_skip_error_names) > MIN_ROOM);
DBUG_ASSERT(MAX_SLAVE_ERROR <= 999999); // 6 digits
@@ -532,14 +718,14 @@ static void print_slave_skip_errors(void)
else
{
char *buff= slave_skip_error_names;
- char *bend= buff + sizeof(slave_skip_error_names);
+ char *bend= buff + sizeof(slave_skip_error_names) - MIN_ROOM;
int errnum;
for (errnum= 0; errnum < MAX_SLAVE_ERROR; errnum++)
{
if (bitmap_is_set(&slave_error_mask, errnum))
{
- if (buff + MIN_ROOM >= bend)
+ if (buff >= bend)
break; /* purecov: tested */
buff= int10_to_str(errnum, buff, 10);
*buff++= ',';
@@ -569,24 +755,24 @@ static void print_slave_skip_errors(void)
Called from get_options() in mysqld.cc on start-up
*/
-void init_slave_skip_errors(const char* arg)
+bool init_slave_skip_errors(const char* arg)
{
const char *p;
DBUG_ENTER("init_slave_skip_errors");
- if (my_bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
- {
- fprintf(stderr, "Badly out of memory, please check your system status\n");
- exit(1);
- }
- use_slave_mask = 1;
+ if (!arg || !*arg) // No errors defined
+ goto end;
+
+ if (unlikely(my_bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0)))
+ DBUG_RETURN(1);
+
+ use_slave_mask= 1;
for (;my_isspace(system_charset_info,*arg);++arg)
/* empty */;
if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
{
bitmap_set_all(&slave_error_mask);
- print_slave_skip_errors();
- DBUG_VOID_RETURN;
+ goto end;
}
for (p= arg ; *p; )
{
@@ -598,11 +784,109 @@ void init_slave_skip_errors(const char* arg)
while (!my_isdigit(system_charset_info,*p) && *p)
p++;
}
- /* Convert slave skip errors bitmap into a printable string. */
- print_slave_skip_errors();
+
+end:
+ make_slave_skip_errors_printable();
+ DBUG_RETURN(0);
+}
+
+/**
+ Make printable version if slave_transaction_retry_errors
+ This is never empty as at least ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT
+ will be there
+*/
+
+static void make_slave_transaction_retry_errors_printable(void)
+{
+ /*
+ To be safe, we want 10 characters of room in the buffer for a number
+ plus terminators. Also, we need some space for constant strings.
+ 10 characters must be sufficient for a number plus {',' | '...'}
+ plus a NUL terminator. That is a max 6 digit number.
+ */
+ const size_t MIN_ROOM= 10;
+ char *buff= slave_transaction_retry_error_names;
+ char *bend= buff + sizeof(slave_transaction_retry_error_names) - MIN_ROOM;
+ uint i;
+ DBUG_ENTER("make_slave_transaction_retry_errors_printable");
+ DBUG_ASSERT(sizeof(slave_transaction_retry_error_names) > MIN_ROOM);
+
+ /* Make @@slave_transaction_retry_errors show a human-readable value */
+ opt_slave_transaction_retry_errors= slave_transaction_retry_error_names;
+
+ for (i= 0; i < slave_transaction_retry_error_length && buff < bend; i++)
+ {
+ buff= int10_to_str(slave_transaction_retry_errors[i], buff, 10);
+ *buff++= ',';
+ }
+ if (buff != slave_transaction_retry_error_names)
+ buff--; // Remove last ','
+ if (i < slave_transaction_retry_error_length)
+ {
+ /* Couldn't show all errors */
+ buff= strmov(buff, "..."); /* purecov: tested */
+ }
+ *buff=0;
+ DBUG_PRINT("exit", ("error_names: '%s'",
+ slave_transaction_retry_error_names));
DBUG_VOID_RETURN;
}
+
+bool init_slave_transaction_retry_errors(const char* arg)
+{
+ const char *p;
+ long err_code;
+ uint i;
+ DBUG_ENTER("init_slave_transaction_retry_errors");
+
+ /* Handle empty strings */
+ if (!arg)
+ arg= "";
+
+ slave_transaction_retry_error_length= 2;
+ for (;my_isspace(system_charset_info,*arg);++arg)
+ /* empty */;
+ for (p= arg; *p; )
+ {
+ if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
+ break;
+ slave_transaction_retry_error_length++;
+ while (!my_isdigit(system_charset_info,*p) && *p)
+ p++;
+ }
+
+ if (unlikely(!(slave_transaction_retry_errors=
+ (uint *) my_once_alloc(sizeof(int) *
+ slave_transaction_retry_error_length,
+ MYF(MY_WME)))))
+ DBUG_RETURN(1);
+
+ /*
+ Temporary error codes:
+ currently, InnoDB deadlock detected by InnoDB or lock
+ wait timeout (innodb_lock_wait_timeout exceeded
+ */
+ slave_transaction_retry_errors[0]= ER_LOCK_DEADLOCK;
+ slave_transaction_retry_errors[1]= ER_LOCK_WAIT_TIMEOUT;
+
+ /* Add user codes after this */
+ for (p= arg, i= 2; *p; )
+ {
+ if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
+ break;
+ if (err_code > 0 && err_code < ER_ERROR_LAST)
+ slave_transaction_retry_errors[i++]= (uint) err_code;
+ while (!my_isdigit(system_charset_info,*p) && *p)
+ p++;
+ }
+ slave_transaction_retry_error_length= i;
+
+ make_slave_transaction_retry_errors_printable();
+ DBUG_RETURN(0);
+}
+
+
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
{
DBUG_ENTER("terminate_slave_threads");
@@ -624,11 +908,12 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
}
else
mi->rli.abort_slave=1;
- if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock,
- &mi->rli.stop_cond,
- &mi->rli.slave_running,
- skip_lock)) &&
- !force_all)
+ if (unlikely((error= terminate_slave_thread(mi->rli.sql_driver_thd,
+ sql_lock,
+ &mi->rli.stop_cond,
+ &mi->rli.slave_running,
+ skip_lock))) &&
+ !force_all)
DBUG_RETURN(error);
retval= error;
@@ -646,11 +931,11 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
{
DBUG_PRINT("info",("Terminating IO thread"));
mi->abort_slave=1;
- if ((error=terminate_slave_thread(mi->io_thd, io_lock,
- &mi->stop_cond,
- &mi->slave_running,
- skip_lock)) &&
- !force_all)
+ if (unlikely((error= terminate_slave_thread(mi->io_thd, io_lock,
+ &mi->stop_cond,
+ &mi->slave_running,
+ skip_lock))) &&
+ !force_all)
DBUG_RETURN(error);
if (!retval)
retval= error;
@@ -671,7 +956,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
mysql_mutex_unlock(log_lock);
}
- DBUG_RETURN(retval);
+ DBUG_RETURN(retval);
}
@@ -754,7 +1039,7 @@ terminate_slave_thread(THD *thd,
int error __attribute__((unused));
DBUG_PRINT("loop", ("killing slave thread"));
- mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
#ifndef DONT_USE_THR_ALARM
/*
Error codes from pthread_kill are:
@@ -764,9 +1049,9 @@ terminate_slave_thread(THD *thd,
int err __attribute__((unused))= pthread_kill(thd->real_id, thr_client_alarm);
DBUG_ASSERT(err != EINVAL);
#endif
- thd->awake(NOT_KILLED);
+ thd->awake_no_mutex(NOT_KILLED);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
/*
There is a small chance that slave thread might miss the first
@@ -826,8 +1111,9 @@ int start_slave_thread(
}
start_id= *slave_run_id;
DBUG_PRINT("info",("Creating new slave thread"));
- if ((error = mysql_thread_create(thread_key,
- &th, &connection_attrib, h_func, (void*)mi)))
+ if (unlikely((error= mysql_thread_create(thread_key,
+ &th, &connection_attrib, h_func,
+ (void*)mi))))
{
sql_print_error("Can't create slave thread (errno= %d).", error);
if (start_lock)
@@ -940,7 +1226,7 @@ int start_slave_threads(THD *thd,
mi->rli.restart_gtid_pos.reset();
}
- if (!error && (thread_mask & SLAVE_IO))
+ if (likely(!error) && likely((thread_mask & SLAVE_IO)))
error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
key_thread_slave_io,
@@ -949,7 +1235,7 @@ int start_slave_threads(THD *thd,
cond_io,
&mi->slave_running, &mi->slave_run_id,
mi);
- if (!error && (thread_mask & SLAVE_SQL))
+ if (likely(!error) && likely(thread_mask & SLAVE_SQL))
{
error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
@@ -959,7 +1245,7 @@ int start_slave_threads(THD *thd,
cond_sql,
&mi->rli.slave_running, &mi->rli.slave_run_id,
mi);
- if (error)
+ if (unlikely(error))
terminate_slave_threads(mi, thread_mask & SLAVE_IO, !need_slave_mutex);
}
DBUG_RETURN(error);
@@ -1184,7 +1470,7 @@ const char *print_slave_db_safe(const char* db)
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val)
{
- uint length;
+ size_t length;
DBUG_ENTER("init_strvar_from_file");
if ((length=my_b_gets(f,var, max_size)))
@@ -1794,7 +2080,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
if (++dbug_count < 3)
goto heartbeat_network_error;
});
- if (mysql_real_query(mysql, query, strlen(query)))
+ if (mysql_real_query(mysql, query, (ulong)strlen(query)))
{
if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
@@ -1842,7 +2128,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
Once the first FD will be received its alg descriptor will replace
the being queried one.
*/
- rc= mysql_real_query(mysql, query, strlen(query));
+ rc= mysql_real_query(mysql, query,(ulong)strlen(query));
if (rc != 0)
{
if (check_io_slave_killed(mi, NULL))
@@ -1931,7 +2217,8 @@ past_checksum:
*/
if (opt_replicate_events_marked_for_skip == RPL_SKIP_FILTER_ON_MASTER)
{
- if (mysql_real_query(mysql, STRING_WITH_LEN("SET skip_replication=1")))
+ if (unlikely(mysql_real_query(mysql,
+ STRING_WITH_LEN("SET skip_replication=1"))))
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
@@ -1975,7 +2262,7 @@ past_checksum:
STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_ANNOTATE))),
mysql_real_query(mysql, STRING_WITH_LEN("SET @mariadb_slave_capability="
STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_MINE))));
- if (rc)
+ if (unlikely(rc))
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
@@ -2051,7 +2338,7 @@ after_set_capability:
query_str.append(STRING_WITH_LEN("'"), system_charset_info);
rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
- if (rc)
+ if (unlikely(rc))
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
@@ -2084,7 +2371,7 @@ after_set_capability:
}
rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
- if (rc)
+ if (unlikely(rc))
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
@@ -2117,7 +2404,7 @@ after_set_capability:
}
rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
- if (rc)
+ if (unlikely(rc))
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
@@ -2153,7 +2440,7 @@ after_set_capability:
query_str.append(STRING_WITH_LEN("'"), system_charset_info);
rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
- if (rc)
+ if (unlikely(rc))
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
@@ -2423,7 +2710,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
bool *suppress_warnings)
{
uchar buf[1024], *pos= buf;
- uint report_host_len=0, report_user_len=0, report_password_len=0;
+ size_t report_host_len=0, report_user_len=0, report_password_len=0;
DBUG_ENTER("register_slave_on_master");
*suppress_warnings= FALSE;
@@ -2431,7 +2718,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
report_host_len= strlen(report_host);
if (report_host_len > HOSTNAME_LENGTH)
{
- sql_print_warning("The length of report_host is %d. "
+ sql_print_warning("The length of report_host is %zu. "
"It is larger than the max length(%d), so this "
"slave cannot be registered to the master.",
report_host_len, HOSTNAME_LENGTH);
@@ -2442,7 +2729,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
report_user_len= strlen(report_user);
if (report_user_len > USERNAME_LENGTH)
{
- sql_print_warning("The length of report_user is %d. "
+ sql_print_warning("The length of report_user is %zu. "
"It is larger than the max length(%d), so this "
"slave cannot be registered to the master.",
report_user_len, USERNAME_LENGTH);
@@ -2453,7 +2740,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
report_password_len= strlen(report_password);
if (report_password_len > MAX_PASSWORD_LENGTH)
{
- sql_print_warning("The length of report_password is %d. "
+ sql_print_warning("The length of report_password is %zu. "
"It is larger than the max length(%d), so this "
"slave cannot be registered to the master.",
report_password_len, MAX_PASSWORD_LENGTH);
@@ -2474,7 +2761,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
/* The master will fill in master_id */
int4store(pos, 0); pos+= 4;
- if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
+ if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (ulong) (pos- buf), 0))
{
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
{
@@ -2718,6 +3005,19 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list,
field_list->push_back(new (mem_root)
Item_empty_string(thd, "Slave_SQL_Running_State",
20));
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "Slave_DDL_Groups", 20,
+ MYSQL_TYPE_LONGLONG),
+ mem_root);
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "Slave_Non_Transactional_Groups", 20,
+ MYSQL_TYPE_LONGLONG),
+ mem_root);
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "Slave_Transactional_Groups", 20,
+ MYSQL_TYPE_LONGLONG),
+ mem_root);
+
if (full)
{
field_list->push_back(new (mem_root)
@@ -2741,7 +3041,7 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list,
mem_root);
field_list->push_back(new (mem_root)
Item_empty_string(thd, "Gtid_Slave_Pos",
- gtid_pos_length),
+ (uint)gtid_pos_length),
mem_root);
}
DBUG_VOID_RETURN;
@@ -2951,6 +3251,17 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
// Slave_SQL_Running_State
protocol->store(slave_sql_running_state, &my_charset_bin);
+ uint64 events;
+ events= (uint64)my_atomic_load64_explicit((volatile int64 *)
+ &mi->total_ddl_groups, MY_MEMORY_ORDER_RELAXED);
+ protocol->store(events);
+ events= (uint64)my_atomic_load64_explicit((volatile int64 *)
+ &mi->total_non_trans_groups, MY_MEMORY_ORDER_RELAXED);
+ protocol->store(events);
+ events= (uint64)my_atomic_load64_explicit((volatile int64 *)
+ &mi->total_trans_groups, MY_MEMORY_ORDER_RELAXED);
+ protocol->store(events);
+
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3068,6 +3379,13 @@ void set_slave_thread_options(THD* thd)
options&= ~OPTION_BIN_LOG;
thd->variables.option_bits= options;
thd->variables.completion_type= 0;
+
+ /* For easier test in LOGGER::log_command */
+ if (thd->variables.log_disabled_statements & LOG_DISABLE_SLAVE)
+ thd->variables.option_bits|= OPTION_LOG_OFF;
+
+ thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements &
+ LOG_SLOW_DISABLE_SLAVE);
DBUG_VOID_RETURN;
}
@@ -3114,8 +3432,7 @@ static int init_slave_thread(THD* thd, Master_info *mi,
thd->security_ctx->skip_grants();
thd->slave_thread= 1;
thd->connection_name= mi->connection_name;
- thd->variables.sql_log_slow= opt_log_slow_slave_statements;
- thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
+ thd->variables.sql_log_slow= !MY_TEST(thd->variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE);
set_slave_thread_options(thd);
if (thd_type == SLAVE_THD_SQL)
@@ -3180,11 +3497,9 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
if (opt_log_slave_updates && opt_replicate_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
- if (RUN_HOOK(binlog_relay_io,
- before_request_transmit,
- (thd, mi, binlog_flags)))
+ if (repl_semisync_slave.request_transmit(mi))
DBUG_RETURN(1);
-
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -3247,7 +3562,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
#endif
len = cli_safe_read_reallen(mysql, network_read_len);
- if (len == packet_error || (long) len < 1)
+ if (unlikely(len == packet_error || (long) len < 1))
{
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
{
@@ -3256,7 +3571,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
we suppress prints to .err file as long as the reconnect
happens without problems
*/
- *suppress_warnings= TRUE;
+ *suppress_warnings=
+ global_system_variables.log_warnings < 2 ? TRUE : FALSE;
}
else
{
@@ -3283,14 +3599,20 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
DBUG_RETURN(len - 1);
}
-/*
+
+/**
Check if the current error is of temporary nature of not.
Some errors are temporary in nature, such as
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.
+
+ @retval 0 if fatal error
+ @retval 1 temporary error, do retry
*/
+
int
has_temporary_error(THD *thd)
{
+ uint current_errno;
DBUG_ENTER("has_temporary_error");
DBUG_EXECUTE_IF("all_errors_are_temporary_errors",
@@ -3305,17 +3627,15 @@ has_temporary_error(THD *thd)
error or not. This is currently the case for Incident_log_event,
which sets no message. Return FALSE.
*/
- if (!thd->is_error())
+ if (!likely(thd->is_error()))
DBUG_RETURN(0);
- /*
- Temporary error codes:
- currently, InnoDB deadlock detected by InnoDB or lock
- wait timeout (innodb_lock_wait_timeout exceeded
- */
- if (thd->get_stmt_da()->sql_errno() == ER_LOCK_DEADLOCK ||
- thd->get_stmt_da()->sql_errno() == ER_LOCK_WAIT_TIMEOUT)
- DBUG_RETURN(1);
+ current_errno= thd->get_stmt_da()->sql_errno();
+ for (uint i= 0; i < slave_transaction_retry_error_length; i++)
+ {
+ if (current_errno == slave_transaction_retry_errors[i])
+ DBUG_RETURN(1);
+ }
DBUG_RETURN(0);
}
@@ -3542,7 +3862,7 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
TODO: Replace this with a decent error message when merged
with BUG#24954 (which adds several new error message).
*/
- if (error)
+ if (unlikely(error))
{
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, rgi->gtid_info(),
"It was not possible to update the positions"
@@ -3647,10 +3967,7 @@ int
apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
rpl_group_info *rgi)
{
-#ifndef DBUG_OFF
- Relay_log_info* rli= rgi->rli;
-#endif
- mysql_mutex_assert_not_owner(&rli->data_lock);
+ mysql_mutex_assert_not_owner(&rgi->rli->data_lock);
int reason= apply_event_and_update_pos_setup(ev, thd, rgi);
/*
In parallel replication, sql_slave_skip_counter is handled in the SQL
@@ -3945,7 +4262,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
update_log_pos failed: this should not happen, so we don't
retry.
*/
- if (exec_res == 2)
+ if (unlikely(exec_res == 2))
DBUG_RETURN(1);
#ifdef WITH_WSREP
@@ -3957,7 +4274,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
if (slave_trans_retries)
{
int UNINIT_VAR(temp_err);
- if (exec_res && (temp_err= has_temporary_error(thd)))
+ if (unlikely(exec_res) && (temp_err= has_temporary_error(thd)))
{
const char *errmsg;
rli->clear_error();
@@ -3992,8 +4309,9 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
exec_res= 0;
serial_rgi->cleanup_context(thd, 1);
/* chance for concurrent connection to get more locks */
- slave_sleep(thd, MY_MIN(serial_rgi->trans_retries,
+ slave_sleep(thd, MY_MAX(MY_MIN(serial_rgi->trans_retries,
MAX_SLAVE_RETRY_PAUSE),
+ slave_trans_retry_interval),
sql_slave_killed, serial_rgi);
serial_rgi->trans_retries++;
mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS
@@ -4087,7 +4405,7 @@ static bool check_io_slave_killed(Master_info *mi, const char *info)
@param[in] mysql MySQL connection.
@param[in] mi Master connection information.
@param[in,out] retry_count Number of attempts to reconnect.
- @param[in] suppress_warnings TRUE when a normal net read timeout
+ @param[in] suppress_warnings TRUE when a normal net read timeout
has caused to reconnecting.
@param[in] messages Messages to print/log, see
reconnect_messages[] array.
@@ -4216,6 +4534,7 @@ pthread_handler_t handle_slave_io(void *arg)
mi->abort_slave = 0;
mysql_mutex_unlock(&mi->run_lock);
mysql_cond_broadcast(&mi->start_cond);
+ mi->rows_event_tracker.reset();
DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu",
mi->master_log_name, mi->master_log_pos));
@@ -4240,7 +4559,8 @@ pthread_handler_t handle_slave_io(void *arg)
}
thd->variables.wsrep_on= 0;
- if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
+ || repl_semisync_slave.slave_start(mi))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4298,6 +4618,10 @@ connected:
*/
mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid;
mi->gtid_event_seen= false;
+ /*
+ Reset stale state of the rows-event group tracker at reconnect.
+ */
+ mi->rows_event_tracker.reset();
}
#ifdef ENABLED_DEBUG_SYNC
@@ -4397,7 +4721,7 @@ connected:
if (check_io_slave_killed(mi, NullS))
goto err;
- if (event_len == packet_error)
+ if (unlikely(event_len == packet_error))
{
uint mysql_error_number= mysql_errno(mysql);
switch (mysql_error_number) {
@@ -4431,9 +4755,10 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log);
event_buf= (const char*)mysql->net.read_pos + 1;
- if (RUN_HOOK(binlog_relay_io, after_read_event,
- (thd, mi,(const char*)mysql->net.read_pos + 1,
- event_len, &event_buf, &event_len)))
+ mi->semi_ack= 0;
+ if (repl_semisync_slave.
+ slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len,
+ &(mi->semi_ack), &event_buf, &event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@@ -4482,9 +4807,6 @@ Stopping slave I/O thread due to out-of-memory error from master");
tokenamount -= network_read_len;
}
- /* XXX: 'synced' should be updated by queue_event to indicate
- whether event has been synced to disk */
- bool synced= 0;
if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
@@ -4493,17 +4815,27 @@ Stopping slave I/O thread due to out-of-memory error from master");
goto err;
}
- if (RUN_HOOK(binlog_relay_io, after_queue_event,
- (thd, mi, event_buf, event_len, synced)))
+ if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
- ER_THD(thd, ER_SLAVE_FATAL_ERROR),
- "Failed to run 'after_queue_event' hook");
- goto err;
+ /*
+ We deliberately ignore the error in slave_reply, such error should
+ not cause the slave IO thread to stop, and the error messages are
+ already reported.
+ */
+ (void)repl_semisync_slave.slave_reply(mi);
}
if (mi->using_gtid == Master_info::USE_GTID_NO &&
- flush_master_info(mi, TRUE, TRUE))
+ /*
+ If rpl_semi_sync_slave_delay_master is enabled, we will flush
+ master info only when ack is needed. This may lead to at least one
+ group transaction delay but affords better performance improvement.
+ */
+ (!repl_semisync_slave.get_slave_enabled() ||
+ (!(mi->semi_ack & SEMI_SYNC_SLAVE_DELAY_SYNC) ||
+ (mi->semi_ack & (SEMI_SYNC_NEED_ACK)))) &&
+ (DBUG_EVALUATE_IF("failed_flush_master_info", 1, 0) ||
+ flush_master_info(mi, TRUE, TRUE)))
{
sql_print_error("Failed to flush master info file");
goto err;
@@ -4563,9 +4895,9 @@ err:
tmp.c_ptr_safe());
sql_print_information("master was %s:%d", mi->host, mi->port);
}
- RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
+ repl_semisync_slave.slave_stop(mi);
thd->reset_query();
- thd->reset_db(NULL, 0);
+ thd->reset_db(&null_clex_str);
if (mysql)
{
/*
@@ -4597,9 +4929,7 @@ err_during_init:
// TODO: make rpl_status part of Master_info
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
- mysql_mutex_lock(&LOCK_thread_count);
- thd->unlink();
- mysql_mutex_unlock(&LOCK_thread_count);
+ thd->assert_not_linked();
delete thd;
thread_safe_decrement32(&service_thread_count);
signal_thd_deleted();
@@ -4701,7 +5031,7 @@ slave_output_error_info(rpl_group_info *rgi, THD *thd)
Relay_log_info *rli= rgi->rli;
uint32 const last_errno= rli->last_error().number;
- if (thd->is_error())
+ if (unlikely(thd->is_error()))
{
char const *const errmsg= thd->get_stmt_da()->message();
@@ -4745,7 +5075,7 @@ slave_output_error_info(rpl_group_info *rgi, THD *thd)
udf_error = true;
sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno());
}
- if (udf_error)
+ if (unlikely(udf_error))
{
StringBuffer<100> tmp;
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
@@ -4873,6 +5203,10 @@ pthread_handler_t handle_slave_sql(void *arg)
applied. In all other cases it must be FALSE.
*/
thd->variables.binlog_annotate_row_events= 0;
+
+ /* Ensure that slave can exeute any alter table it gets from master */
+ thd->variables.alter_algorithm= (ulong) Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT;
+
add_to_active_threads(thd);
/*
We are going to set slave_running to 1. Assuming slave I/O thread is
@@ -5013,12 +5347,20 @@ pthread_handler_t handle_slave_sql(void *arg)
if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode)
goto err;
}
+ /* Re-load the set of mysql.gtid_slave_posXXX tables available. */
+ if (find_gtid_slave_pos_tables(thd))
+ {
+ rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL,
+ "Error processing replication GTID position tables: %s",
+ thd->get_stmt_da()->message());
+ goto err;
+ }
/* execute init_slave variable */
if (opt_init_slave.length)
{
execute_init_command(thd, &opt_init_slave, &LOCK_sys_init_slave);
- if (thd->is_slave_error)
+ if (unlikely(thd->is_slave_error))
{
rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL,
"Slave SQL thread aborted. Can't execute init_slave query");
@@ -5170,7 +5512,7 @@ pthread_handler_t handle_slave_sql(void *arg)
*/
thd->catalog= 0;
thd->reset_query();
- thd->reset_db(NULL, 0);
+ thd->reset_db(&null_clex_str);
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
ulong domain_count;
@@ -5295,11 +5637,7 @@ err_during_init:
rpl_parallel_resize_pool_if_no_slaves();
- /* TODO: Check if this lock is needed */
- mysql_mutex_lock(&LOCK_thread_count);
delete serial_rgi;
- mysql_mutex_unlock(&LOCK_thread_count);
-
delete thd;
thread_safe_decrement32(&service_thread_count);
signal_thd_deleted();
@@ -5712,7 +6050,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
int error= 0;
StringBuffer<1024> error_msg;
- ulonglong inc_pos;
+ ulonglong inc_pos= 0;
ulonglong event_pos;
Relay_log_info *rli= &mi->rli;
mysql_mutex_t *log_lock= rli->relay_log.get_log_lock();
@@ -5726,7 +6064,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
char* new_buf = NULL;
char new_buf_arr[4096];
bool is_malloc = false;
-
+ bool is_rows_event= false;
/*
FD_q must have been prepared for the first R_a event
inside get_master_version_and_clock()
@@ -5811,6 +6149,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_ASSERT(debug_sync_service);
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
+ dbug_rows_event_count = 0;
};);
#endif
mysql_mutex_lock(&mi->data_lock);
@@ -5908,7 +6247,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_unlock(log_lock);
goto err;
}
- rli->relay_log.signal_update();
+ rli->relay_log.signal_relay_log_update();
mysql_mutex_unlock(log_lock);
mi->gtid_reconnect_event_skip_count= 0;
@@ -6160,11 +6499,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
got_gtid_event= true;
if (mi->using_gtid == Master_info::USE_GTID_NO)
goto default_action;
- if (unlikely(!mi->gtid_event_seen))
+ if (unlikely(mi->gtid_reconnect_event_skip_count))
{
- mi->gtid_event_seen= true;
- if (mi->gtid_reconnect_event_skip_count)
+ if (likely(!mi->gtid_event_seen))
{
+ mi->gtid_event_seen= true;
/*
If we are reconnecting, and we need to skip a partial event group
already queued to the relay log before the reconnect, then we check
@@ -6193,13 +6532,45 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first);
goto err;
}
+ if (global_system_variables.log_warnings > 1)
+ {
+ bool first= true;
+ StringBuffer<1024> gtid_text;
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ sql_print_information("Slave IO thread is reconnected to "
+ "receive Gtid_log_event %s. It is to skip %llu "
+ "already received events including the gtid one",
+ gtid_text.ptr(),
+ mi->events_queued_since_last_gtid);
+ }
+ goto default_action;
}
- }
+ else
+ {
+ bool first;
+ StringBuffer<1024> gtid_text;
- if (unlikely(mi->gtid_reconnect_event_skip_count))
- {
- goto default_action;
+ gtid_text.append(STRING_WITH_LEN("Last received gtid: "));
+ first= true;
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ gtid_text.append(STRING_WITH_LEN(", currently received: "));
+ first= true;
+ rpl_slave_state_tostring_helper(&gtid_text, &event_gtid, &first);
+
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ sql_print_error("Slave IO thread has received a new Gtid_log_event "
+ "while skipping already logged events "
+ "after reconnect. %s. %llu remains to be skipped. "
+ "The number of originally read events was %llu",
+ gtid_text.ptr(),
+ mi->gtid_reconnect_event_skip_count,
+ mi->events_queued_since_last_gtid);
+ goto err;
+ }
}
+ mi->gtid_event_seen= true;
/*
We have successfully queued to relay log everything before this GTID, so
@@ -6266,8 +6637,34 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
}
}
- buf = new_buf;
is_compress_event = true;
+ buf = new_buf;
+ /*
+ As we are uncertain about compressed V2 rows events, we don't track
+ them
+ */
+ if (LOG_EVENT_IS_ROW_V2((Log_event_type) buf[EVENT_TYPE_OFFSET]))
+ goto default_action;
+ /* fall through */
+ case WRITE_ROWS_EVENT_V1:
+ case UPDATE_ROWS_EVENT_V1:
+ case DELETE_ROWS_EVENT_V1:
+ case WRITE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case DELETE_ROWS_EVENT:
+ {
+ is_rows_event= true;
+ mi->rows_event_tracker.update(mi->master_log_name,
+ mi->master_log_pos,
+ buf,
+ mi->rli.relay_log.
+ description_event_for_queue);
+
+ DBUG_EXECUTE_IF("simulate_stmt_end_rows_event_loss",
+ {
+ mi->rows_event_tracker.stmt_end_seen= false;
+ });
+ }
goto default_action;
#ifndef DBUG_OFF
@@ -6337,6 +6734,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
/*
+ Integrity of Rows- event group check.
+ A sequence of Rows- events must end with STMT_END_F flagged one.
+ Even when Heartbeat event interrupts Rows- events flow this must indicate a
+ malfunction e.g logging on the master.
+ */
+ if (((uchar) buf[EVENT_TYPE_OFFSET] != HEARTBEAT_LOG_EVENT) &&
+ !is_rows_event &&
+ mi->rows_event_tracker.check_and_report(mi->master_log_name,
+ mi->master_log_pos))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+
+ /*
If we filter events master-side (eg. @@skip_replication), we will see holes
in the event positions from the master. If we see such a hole, adjust
mi->master_log_pos accordingly so we maintain the correct position (for
@@ -6464,7 +6876,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
if (got_gtid_event)
rli->ign_gtids.update(&event_gtid);
}
- rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
+ // the slave SQL thread needs to re-check
+ rli->relay_log.signal_relay_log_update();
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
(ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET)));
}
@@ -6488,7 +6901,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
mysql_mutex_unlock(log_lock);
- if (!error &&
+ if (likely(!error) &&
mi->using_gtid != Master_info::USE_GTID_NO &&
mi->events_queued_since_last_gtid > 0 &&
( (mi->last_queued_gtid_standalone &&
@@ -6504,6 +6917,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
The whole of the current event group is queued. So in case of
reconnect we can start from after the current GTID.
*/
+ if (mi->gtid_reconnect_event_skip_count)
+ {
+ bool first= true;
+ StringBuffer<1024> gtid_text;
+
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ sql_print_error("Slave IO thread received a terminal event from "
+ "group %s whose retrieval was interrupted "
+ "with reconnect. We still had %llu events to read. "
+ "The number of originally read events was %llu",
+ gtid_text.ptr(),
+ mi->gtid_reconnect_event_skip_count,
+ mi->events_queued_since_last_gtid);
+ }
mi->gtid_current_pos.update(&mi->last_queued_gtid);
mi->events_queued_since_last_gtid= 0;
@@ -6522,11 +6950,11 @@ err:
Do not print ER_SLAVE_RELAY_LOG_WRITE_FAILURE error here, as the caller
handle_slave_io() prints it on return.
*/
- if (error && error != ER_SLAVE_RELAY_LOG_WRITE_FAILURE)
+ if (unlikely(error) && error != ER_SLAVE_RELAY_LOG_WRITE_FAILURE)
mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr());
- if(is_malloc)
+ if (unlikely(is_malloc))
my_free((void *)new_buf);
DBUG_RETURN(error);
@@ -6974,7 +7402,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
MYSQL_BIN_LOG::open() will write the buffered description event.
*/
old_pos= rli->event_relay_log_pos;
- if ((ev= Log_event::read_log_event(cur_log,0,
+ if ((ev= Log_event::read_log_event(cur_log,
rli->relay_log.description_event_for_exec,
opt_slave_sql_verify_checksum)))
@@ -6993,7 +7421,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
}
if (opt_reckless_slave) // For mysql-test
cur_log->error = 0;
- if (cur_log->error < 0)
+ if (unlikely(cur_log->error < 0))
{
errmsg = "slave SQL thread aborted because of I/O error";
if (hot_log)
@@ -7538,6 +7966,92 @@ bool rpl_master_erroneous_autoinc(THD *thd)
return FALSE;
}
+
+static bool get_row_event_stmt_end(const char* buf,
+ const Format_description_log_event *fdle)
+{
+ uint8 const common_header_len= fdle->common_header_len;
+ Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
+
+ uint8 const post_header_len= fdle->post_header_len[event_type-1];
+ const char *flag_start= buf + common_header_len;
+ /*
+ The term 4 below signifies that master is of 'an intermediate source', see
+ Rows_log_event::Rows_log_event.
+ */
+ flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET);
+
+ return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0;
+}
+
+
+/*
+ Reset log event tracking data.
+*/
+
+void Rows_event_tracker::reset()
+{
+ binlog_file_name[0]= 0;
+ first_seen= last_seen= 0;
+ stmt_end_seen= false;
+}
+
+
+/*
+ Update log event tracking data.
+
+ The first- and last- seen event binlog position get memorized, as
+ well as the end-of-statement status of the last one.
+*/
+
+void Rows_event_tracker::update(const char* file_name, my_off_t pos,
+ const char* buf,
+ const Format_description_log_event *fdle)
+{
+ if (!first_seen)
+ {
+ first_seen= pos;
+ strmake(binlog_file_name, file_name, sizeof(binlog_file_name) - 1);
+ }
+ last_seen= pos;
+ DBUG_ASSERT(stmt_end_seen == 0); // We can only have one
+ stmt_end_seen= get_row_event_stmt_end(buf, fdle);
+};
+
+
+/**
+ The function is called at next event reading
+ after a sequence of Rows- log-events. It checks the end-of-statement status
+ of the past sequence to report on any isssue.
+ In the positive case the tracker gets reset.
+
+ @return true when the Rows- event group integrity found compromised,
+ false otherwise.
+*/
+bool Rows_event_tracker::check_and_report(const char* file_name,
+ my_off_t pos)
+{
+ if (last_seen)
+ {
+ // there was at least one "block" event previously
+ if (!stmt_end_seen)
+ {
+ sql_print_error("Slave IO thread did not receive an expected "
+ "Rows-log end-of-statement for event starting "
+ "at log '%s' position %llu "
+ "whose last block was seen at log '%s' position %llu. "
+ "The end-of-statement should have been delivered "
+ "before the current one at log '%s' position %llu",
+ binlog_file_name, first_seen,
+ binlog_file_name, last_seen, file_name, pos);
+ return true;
+ }
+ reset();
+ }
+
+ return false;
+}
+
/**
@} (end of group Replication)
*/