summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc313
1 files changed, 230 insertions, 83 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index ca29410cd1d..6e70f090247 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -302,7 +302,10 @@ handle_slave_init(void *arg __attribute__((unused)))
mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
+ thd->system_thread = SYSTEM_THREAD_SLAVE_INIT;
thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thd->set_command(COM_DAEMON);
thd_proc_info(thd, "Loading slave GTID position from table");
if (rpl_load_gtid_slave_state(thd))
@@ -317,15 +320,22 @@ handle_slave_init(void *arg __attribute__((unused)))
mysql_mutex_unlock(&LOCK_thread_count);
my_thread_end();
- mysql_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_slave_init);
slave_init_thread_running= false;
- mysql_cond_broadcast(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_cond_broadcast(&COND_slave_init);
+ mysql_mutex_unlock(&LOCK_slave_init);
return 0;
}
+/*
+ Start the slave init thread.
+
+ This thread is used to load the GTID state from mysql.gtid_slave_pos at
+ server start; reading from table requires valid THD, which is otherwise not
+ available during server init.
+*/
static int
run_slave_init_thread()
{
@@ -339,10 +349,10 @@ run_slave_init_thread()
return 1;
}
- mysql_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_slave_init);
while (slave_init_thread_running)
- mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_cond_wait(&COND_slave_init, &LOCK_slave_init);
+ mysql_mutex_unlock(&LOCK_slave_init);
return 0;
}
@@ -1090,21 +1100,21 @@ static bool sql_slave_killed(rpl_group_info *rgi)
if (ret == 0)
{
- rli->report(WARNING_LEVEL, 0,
+ rli->report(WARNING_LEVEL, 0, rgi->gtid_info(),
"Request to stop slave SQL Thread received while "
"applying a group that has non-transactional "
"changes; waiting for completion of the group ... ");
}
else
{
- rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(),
ER(ER_SLAVE_FATAL_ERROR), msg_stopped);
}
}
else
{
ret= TRUE;
- rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(),
ER(ER_SLAVE_FATAL_ERROR),
msg_stopped);
}
@@ -1522,7 +1532,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
- mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL,
"Get master clock failed with error: %s", mysql_error(mysql));
goto network_err;
}
@@ -1587,7 +1597,7 @@ not always make sense; please check the manual before using it).";
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
- mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL,
"Get master SERVER_ID failed with error: %s", mysql_error(mysql));
goto network_err;
}
@@ -1600,7 +1610,7 @@ when it try to get the value of SERVER_ID variable from master.";
}
else if (!master_row && master_res)
{
- mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE,
+ mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE, NULL,
"Unknown system variable 'SERVER_ID' on master, \
maybe it is a *VERY OLD MASTER*.");
}
@@ -1660,7 +1670,7 @@ be equal for the Statement-format replication to work";
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
- mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL,
"Get master COLLATION_SERVER failed with error: %s", mysql_error(mysql));
goto network_err;
}
@@ -1674,7 +1684,7 @@ when it try to get the value of COLLATION_SERVER global variable from master.";
goto err;
}
else
- mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE,
+ mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE, NULL,
"Unknown system variable 'COLLATION_SERVER' on master, \
maybe it is a *VERY OLD MASTER*. *NOTE*: slave may experience \
inconsistency if replicated data deals with collation.");
@@ -1723,7 +1733,7 @@ be equal for the Statement-format replication to work";
goto slave_killed_err;
else if (is_network_error(err_code= mysql_errno(mysql)))
{
- mi->report(ERROR_LEVEL, err_code,
+ mi->report(ERROR_LEVEL, err_code, NULL,
"Get master TIME_ZONE failed with error: %s",
mysql_error(mysql));
goto network_err;
@@ -1731,7 +1741,7 @@ be equal for the Statement-format replication to work";
else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE)
{
/* We use ERROR_LEVEL to get the error logged to file */
- mi->report(ERROR_LEVEL, err_code,
+ mi->report(ERROR_LEVEL, err_code, NULL,
"MySQL master doesn't have a TIME_ZONE variable. Note that"
"if your timezone is not same between master and slave, your "
@@ -1763,15 +1773,35 @@ when it try to get the value of TIME_ZONE global variable from master.";
llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
sprintf(query, query_format, llbuf);
- if (mysql_real_query(mysql, query, strlen(query))
- && !check_io_slave_killed(mi, NULL))
+ DBUG_EXECUTE_IF("simulate_slave_heartbeat_network_error",
+ { static ulong dbug_count= 0;
+ if (++dbug_count < 3)
+ goto heartbeat_network_error;
+ });
+ if (mysql_real_query(mysql, query, strlen(query)))
{
- errmsg= "The slave I/O thread stops because SET @master_heartbeat_period "
- "on master failed.";
- err_code= ER_SLAVE_FATAL_ERROR;
- sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
- mysql_free_result(mysql_store_result(mysql));
- goto err;
+ if (check_io_slave_killed(mi, NULL))
+ goto slave_killed_err;
+
+ if (is_network_error(mysql_errno(mysql)))
+ {
+ IF_DBUG(heartbeat_network_error: , )
+ mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL,
+ "SET @master_heartbeat_period to master failed with error: %s",
+ mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is encountered "
+ "when it tries to SET @master_heartbeat_period on master.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto err;
+ }
}
mysql_free_result(mysql_store_result(mysql));
}
@@ -1808,7 +1838,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
if (global_system_variables.log_warnings > 1)
{
// this is tolerable as OM -> NS is supported
- mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL,
"Notifying master by %s failed with "
"error: %s", query, mysql_error(mysql));
}
@@ -1817,7 +1847,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
{
if (is_network_error(mysql_errno(mysql)))
{
- mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL,
"Notifying master by %s failed with "
"error: %s", query, mysql_error(mysql));
mysql_free_result(mysql_store_result(mysql));
@@ -1853,7 +1883,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
- mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL,
"Get master BINLOG_CHECKSUM failed with error: %s", mysql_error(mysql));
goto network_err;
}
@@ -1890,7 +1920,7 @@ past_checksum:
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
- mi->report(ERROR_LEVEL, err_code,
+ mi->report(ERROR_LEVEL, err_code, NULL,
"Setting master-side filtering of @@skip_replication failed "
"with error: %s", mysql_error(mysql));
goto network_err;
@@ -1934,7 +1964,7 @@ past_checksum:
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
- mi->report(ERROR_LEVEL, err_code,
+ mi->report(ERROR_LEVEL, err_code, NULL,
"Setting @mariadb_slave_capability failed with error: %s",
mysql_error(mysql));
goto network_err;
@@ -2000,7 +2030,7 @@ after_set_capability:
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
- mi->report(ERROR_LEVEL, err_code,
+ mi->report(ERROR_LEVEL, err_code, NULL,
"Setting @slave_connect_state failed with error: %s",
mysql_error(mysql));
goto network_err;
@@ -2033,7 +2063,7 @@ after_set_capability:
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
- mi->report(ERROR_LEVEL, err_code,
+ mi->report(ERROR_LEVEL, err_code, NULL,
"Setting @slave_gtid_strict_mode failed with error: %s",
mysql_error(mysql));
goto network_err;
@@ -2066,7 +2096,7 @@ after_set_capability:
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
- mi->report(ERROR_LEVEL, err_code,
+ mi->report(ERROR_LEVEL, err_code, NULL,
"Setting @slave_gtid_ignore_duplicates failed with "
"error: %s", mysql_error(mysql));
goto network_err;
@@ -2102,7 +2132,7 @@ after_set_capability:
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
- mi->report(ERROR_LEVEL, err_code,
+ mi->report(ERROR_LEVEL, err_code, NULL,
"Setting @slave_until_gtid failed with error: %s",
mysql_error(mysql));
goto network_err;
@@ -2150,7 +2180,7 @@ after_set_capability:
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
- mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL,
"Get master GTID position failed with error: %s", mysql_error(mysql));
goto network_err;
}
@@ -2180,7 +2210,7 @@ err:
if (master_res)
mysql_free_result(master_res);
DBUG_ASSERT(err_code != 0);
- mi->report(ERROR_LEVEL, err_code, "%s", err_buff);
+ mi->report(ERROR_LEVEL, err_code, NULL, "%s", err_buff);
DBUG_RETURN(1);
}
@@ -2201,6 +2231,7 @@ slave_killed_err:
static bool wait_for_relay_log_space(Relay_log_info* rli)
{
bool slave_killed=0;
+ bool ignore_log_space_limit;
Master_info* mi = rli->mi;
PSI_stage_info old_stage;
THD* thd = mi->io_thd;
@@ -2216,6 +2247,11 @@ static bool wait_for_relay_log_space(Relay_log_info* rli)
!rli->ignore_log_space_limit)
mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
+ ignore_log_space_limit= rli->ignore_log_space_limit;
+ rli->ignore_log_space_limit= 0;
+
+ thd->EXIT_COND(&old_stage);
+
/*
Makes the IO thread read only one event at a time
until the SQL thread is able to purge the relay
@@ -2239,7 +2275,8 @@ static bool wait_for_relay_log_space(Relay_log_info* rli)
thread sleeps waiting for events.
*/
- if (rli->ignore_log_space_limit)
+
+ if (ignore_log_space_limit)
{
#ifndef DBUG_OFF
{
@@ -2261,11 +2298,8 @@ static bool wait_for_relay_log_space(Relay_log_info* rli)
mysql_mutex_unlock(&mi->data_lock);
rli->sql_force_rotate_relay= false;
}
-
- rli->ignore_log_space_limit= false;
}
- thd->EXIT_COND(&old_stage);
DBUG_RETURN(slave_killed);
}
@@ -2302,7 +2336,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
Rotate_log_event::DUP_NAME);
rli->ign_master_log_name_end[0]= 0;
if (unlikely(!(bool)rev))
- mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, NULL,
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
"Rotate_event (out of memory?),"
" SHOW SLAVE STATUS may be inaccurate");
@@ -2313,7 +2347,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
Gtid_list_log_event::FLAG_IGN_GTIDS);
rli->ign_gtids.reset();
if (unlikely(!(bool)glev))
- mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, NULL,
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
"Gtid_list_event (out of memory?),"
" gtid_slave_pos may be inaccurate");
@@ -2326,7 +2360,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
rev->server_id= 0; // don't be ignored by slave SQL thread
if (unlikely(rli->relay_log.append(rev)))
- mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Rotate event"
" to the relay log, SHOW SLAVE STATUS may be"
@@ -2339,7 +2373,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
glev->server_id= 0; // don't be ignored by slave SQL thread
glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
if (unlikely(rli->relay_log.append(glev)))
- mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Gtid_list event to the relay log, "
"gtid_slave_pos may be inaccurate");
@@ -2424,7 +2458,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
char buf[256];
my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql),
mysql_errno(mysql));
- mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, NULL,
ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
}
DBUG_RETURN(1);
@@ -2829,7 +2863,8 @@ bool show_all_master_info(THD* thd)
if (send_show_master_info_header(thd, 1, gtid_pos.length()))
DBUG_RETURN(TRUE);
- if (!(elements= master_info_index->master_info_hash.records))
+ if (!master_info_index ||
+ !(elements= master_info_index->master_info_hash.records))
goto end;
/*
@@ -3093,7 +3128,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
Some errors are temporary in nature, such as
ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.
*/
-static int has_temporary_error(THD *thd)
+int
+has_temporary_error(THD *thd)
{
DBUG_ENTER("has_temporary_error");
@@ -3274,7 +3310,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
if (error)
{
char buf[22];
- rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
+ rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, rgi->gtid_info(),
"It was not possible to update the positions"
" of the relay log information: the slave may"
" be in an inconsistent state."
@@ -3290,7 +3326,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
Make sure we do not errorneously update gtid_slave_pos with a lingering
GTID from this failed event group (MDEV-4906).
*/
- rgi->gtid_sub_id= 0;
+ rgi->gtid_pending= false;
}
DBUG_RETURN(exec_res ? 1 : 0);
@@ -3501,9 +3537,6 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
if (opt_gtid_ignore_duplicates)
{
- serial_rgi->current_gtid.domain_id= gev->domain_id;
- serial_rgi->current_gtid.server_id= gev->server_id;
- serial_rgi->current_gtid.seq_no= gev->seq_no;
int res= rpl_global_gtid_slave_state.check_duplicate_gtid
(&serial_rgi->current_gtid, serial_rgi);
if (res < 0)
@@ -3616,7 +3649,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
DBUG_RETURN(exec_res);
}
mysql_mutex_unlock(&rli->data_lock);
- rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
+ rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE, NULL,
ER(ER_SLAVE_RELAY_LOG_READ_FAILURE), "\
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
@@ -3711,7 +3744,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
*/
if (messages[SLAVE_RECON_MSG_COMMAND][0])
{
- mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
+ mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, NULL,
ER(ER_SLAVE_MASTER_COM_FAILURE),
messages[SLAVE_RECON_MSG_COMMAND], buf);
}
@@ -3801,7 +3834,7 @@ pthread_handler_t handle_slave_io(void *arg)
/* Load the set of seen GTIDs, if we did not already. */
if (rpl_load_gtid_slave_state(thd))
{
- mi->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
+ mi->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL,
"Unable to load replication GTID slave state from mysql.%s: %s",
rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->message());
@@ -3817,14 +3850,14 @@ pthread_handler_t handle_slave_io(void *arg)
if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
goto err;
}
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER(ER_SLAVE_FATAL_ERROR), "error in mysql_init()");
goto err;
}
@@ -4006,18 +4039,18 @@ Log entry on master is longer than slave_max_allowed_packet (%lu) on \
slave. If the entry is correct, restart the server with a higher value of \
slave_max_allowed_packet",
slave_max_allowed_packet);
- mi->report(ERROR_LEVEL, ER_NET_PACKET_TOO_LARGE,
+ mi->report(ERROR_LEVEL, ER_NET_PACKET_TOO_LARGE, NULL,
"%s", "Got a packet bigger than 'slave_max_allowed_packet' bytes");
goto err;
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
- mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG,
+ mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG, NULL,
ER(ER_MASTER_FATAL_ERROR_READING_BINLOG),
mysql_error_number, mysql_error(mysql));
goto err;
case ER_OUT_OF_RESOURCES:
sql_print_error("\
Stopping slave I/O thread due to out-of-memory error from master");
- mi->report(ERROR_LEVEL, ER_OUT_OF_RESOURCES,
+ mi->report(ERROR_LEVEL, ER_OUT_OF_RESOURCES, NULL,
"%s", ER(ER_OUT_OF_RESOURCES));
goto err;
}
@@ -4034,7 +4067,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
(thd, mi,(const char*)mysql->net.read_pos + 1,
event_len, &event_buf, &event_len)))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER(ER_SLAVE_FATAL_ERROR),
"Failed to run 'after_read_event' hook");
goto err;
@@ -4045,7 +4078,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
bool synced= 0;
if (queue_event(mi, event_buf, event_len))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"could not queue event from master");
goto err;
@@ -4054,7 +4087,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
if (RUN_HOOK(binlog_relay_io, after_queue_event,
(thd, mi, event_buf, event_len, synced)))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER(ER_SLAVE_FATAL_ERROR),
"Failed to run 'after_queue_event' hook");
goto err;
@@ -4151,9 +4184,10 @@ err_during_init:
// TODO: make rpl_status part of Master_info
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
mysql_mutex_lock(&LOCK_thread_count);
+ thd->unlink();
+ mysql_mutex_unlock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
- mysql_mutex_unlock(&LOCK_thread_count);
mi->abort_slave= 0;
mi->slave_running= MYSQL_SLAVE_NOT_RUN;
mi->io_thd= 0;
@@ -4242,13 +4276,14 @@ end:
void
-slave_output_error_info(Relay_log_info *rli, THD *thd)
+slave_output_error_info(rpl_group_info *rgi, THD *thd)
{
/*
retrieve as much info as possible from the thd and, error
codes and warnings and print this to the error log as to
allow the user to locate the error
*/
+ Relay_log_info *rli= rgi->rli;
uint32 const last_errno= rli->last_error().number;
char llbuff[22];
@@ -4265,7 +4300,8 @@ slave_output_error_info(Relay_log_info *rli, THD *thd)
This function is reporting an error which was not reported
while executing exec_relay_log_event().
*/
- rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), "%s", errmsg);
+ rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
+ rgi->gtid_info(), "%s", errmsg);
}
else if (last_errno != thd->get_stmt_da()->sql_errno())
{
@@ -4344,6 +4380,7 @@ pthread_handler_t handle_slave_sql(void *arg)
char saved_master_log_name[FN_REFLEN];
my_off_t UNINIT_VAR(saved_log_pos);
my_off_t UNINIT_VAR(saved_master_log_pos);
+ String saved_skip_gtid_pos;
my_off_t saved_skip= 0;
Master_info *mi= ((Master_info*)arg);
Relay_log_info* rli = &mi->rli;
@@ -4394,7 +4431,7 @@ pthread_handler_t handle_slave_sql(void *arg)
will be stuck if we fail here
*/
mysql_cond_broadcast(&rli->start_cond);
- rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
"Failed during slave thread initialization");
goto err_during_init;
}
@@ -4446,16 +4483,20 @@ pthread_handler_t handle_slave_sql(void *arg)
mysql_mutex_unlock(&rli->log_space_lock);
serial_rgi->gtid_sub_id= 0;
+ serial_rgi->gtid_pending= false;
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
1 /*need data lock*/, &errmsg,
1 /*look for a description_event*/))
{
- rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
"Error initializing relay log position: %s", errmsg);
goto err;
}
+ if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
+ goto err;
+
strcpy(rli->future_event_master_log_name, rli->group_master_log_name);
THD_CHECK_SENTRY(thd);
#ifndef DBUG_OFF
@@ -4510,7 +4551,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
if (check_temp_dir(rli->slave_patternload_file))
{
- rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
+ rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL,
"Unable to use slave's temporary directory %s - %s",
slave_load_tmpdir, thd->get_stmt_da()->message());
goto err;
@@ -4519,7 +4560,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
/* Load the set of seen GTIDs, if we did not already. */
if (rpl_load_gtid_slave_state(thd))
{
- rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
+ rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL,
"Unable to load replication GTID slave state from mysql.%s: %s",
rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->message());
@@ -4538,7 +4579,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
execute_init_command(thd, &opt_init_slave, &LOCK_sys_init_slave);
if (thd->is_slave_error)
{
- rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(),
+ rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL,
"Slave SQL thread aborted. Can't execute init_slave query");
goto err;
}
@@ -4555,6 +4596,12 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
strmake_buf(saved_master_log_name, rli->group_master_log_name);
saved_log_pos= rli->group_relay_log_pos;
saved_master_log_pos= rli->group_master_log_pos;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ saved_skip_gtid_pos.append(STRING_WITH_LEN(", GTID '"));
+ rpl_append_gtid_state(&saved_skip_gtid_pos, false);
+ saved_skip_gtid_pos.append(STRING_WITH_LEN("'; "));
+ }
saved_skip= rli->slave_skip_counter;
}
if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
@@ -4578,16 +4625,27 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
if (saved_skip && rli->slave_skip_counter == 0)
{
+ String tmp;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ tmp.append(STRING_WITH_LEN(", GTID '"));
+ rpl_append_gtid_state(&tmp, false);
+ tmp.append(STRING_WITH_LEN("'; "));
+ }
+
sql_print_information("'SQL_SLAVE_SKIP_COUNTER=%ld' executed at "
"relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', "
- "master_log_pos='%ld' and new position at "
+ "master_log_pos='%ld'%s and new position at "
"relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', "
- "master_log_pos='%ld' ",
+ "master_log_pos='%ld'%s ",
(ulong) saved_skip, saved_log_name, (ulong) saved_log_pos,
saved_master_log_name, (ulong) saved_master_log_pos,
+ saved_skip_gtid_pos.c_ptr_safe(),
rli->group_relay_log_name, (ulong) rli->group_relay_log_pos,
- rli->group_master_log_name, (ulong) rli->group_master_log_pos);
+ rli->group_master_log_name, (ulong) rli->group_master_log_pos,
+ tmp.c_ptr_safe());
saved_skip= 0;
+ saved_skip_gtid_pos.free();
}
if (exec_relay_log_event(thd, rli, serial_rgi))
@@ -4596,7 +4654,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
// do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(serial_rgi))
{
- slave_output_error_info(rli, thd);
+ slave_output_error_info(serial_rgi, thd);
if (WSREP_ON && rli->last_error().number == ER_UNKNOWN_COM_ERROR)
{
wsrep_node_dropped= TRUE;
@@ -4791,7 +4849,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
xev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&xev)))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"error writing Exec_load event to relay log");
goto err;
@@ -4805,7 +4863,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
cev->block_len = num_bytes;
if (unlikely(mi->rli.relay_log.append(cev)))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"error writing Create_file event to relay log");
goto err;
@@ -4820,7 +4878,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
aev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&aev)))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
+ mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"error writing Append_block event to relay log");
goto err;
@@ -4927,7 +4985,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
{
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
{
- mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed");
DBUG_RETURN(1);
}
@@ -5225,6 +5283,86 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
event_len - BINLOG_CHECKSUM_LEN : event_len,
mi->rli.relay_log.description_event_for_queue);
+ if (unlikely(mi->gtid_reconnect_event_skip_count) &&
+ unlikely(!mi->gtid_event_seen) &&
+ rev.is_artificial_event() &&
+ (mi->prev_master_id != mi->master_id ||
+ strcmp(rev.new_log_ident, mi->master_log_name) != 0))
+ {
+ /*
+ Artificial Rotate_log_event is the first event we receive at the start
+ of each master binlog file. It gives the name of the new binlog file.
+
+ Normally, we already have this name from the real rotate event at the
+ end of the previous binlog file (unless we are making a new connection
+ using GTID). But if the master server restarted/crashed, there is no
+ rotate event at the end of the prior binlog file, so the name is new.
+
+ We use this fact to handle a special case of master crashing. If the
+ master crashed while writing the binlog, it might end with a partial
+ event group lacking the COMMIT/XID event, which must be rolled
+ back. If the slave IO thread happens to get a disconnect in the middle
+ of exactly this event group, it will try to reconnect at the same GTID
+ and skip already fetched events. However, that GTID did not commit on
+ the master before the crash, so it does not really exist, and the
+ master will connect the slave at the next following GTID starting in
+ the next binlog. This could confuse the slave and make it mix the
+ start of one event group with the end of another.
+
+ But we detect this case here, by noticing the change of binlog name
+ which detects the missing rotate event at the end of the previous
+ binlog file. In this case, we reset the counters to make us not skip
+ the next event group, and queue an artificial Format Description
+ event. The previously fetched incomplete event group will then be
+ rolled back when the Format Description event is executed by the SQL
+ thread.
+
+ A similar case is if the reconnect somehow connects to a different
+ master server (like due to a network proxy or IP address takeover).
+ We detect this case by noticing a change of server_id and in this
+ case likewise rollback the partially received event group.
+ */
+ Format_description_log_event fdle(4);
+
+ if (mi->prev_master_id != mi->master_id)
+ sql_print_warning("The server_id of master server changed in the "
+ "middle of GTID %u-%u-%llu. Assuming a change of "
+ "master server, so rolling back the previously "
+ "received partial transaction. Expected: %lu, "
+ "received: %lu", mi->last_queued_gtid.domain_id,
+ mi->last_queued_gtid.server_id,
+ mi->last_queued_gtid.seq_no,
+ mi->prev_master_id, mi->master_id);
+ else if (strcmp(rev.new_log_ident, mi->master_log_name) != 0)
+ sql_print_warning("Unexpected change of master binlog file name in the "
+ "middle of GTID %u-%u-%llu, assuming that master has "
+ "crashed and rolling back the transaction. Expected: "
+ "'%s', received: '%s'",
+ mi->last_queued_gtid.domain_id,
+ mi->last_queued_gtid.server_id,
+ mi->last_queued_gtid.seq_no,
+ mi->master_log_name, rev.new_log_ident);
+
+ mysql_mutex_lock(log_lock);
+ if (likely(!fdle.write(rli->relay_log.get_log_file()) &&
+ !rli->relay_log.flush_and_sync(NULL)))
+ {
+ rli->relay_log.harvest_bytes_written(&rli->log_space_total);
+ }
+ else
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ mysql_mutex_unlock(log_lock);
+ goto err;
+ }
+ rli->relay_log.signal_update();
+ mysql_mutex_unlock(log_lock);
+
+ mi->gtid_reconnect_event_skip_count= 0;
+ mi->events_queued_since_last_gtid= 0;
+ }
+ mi->prev_master_id= mi->master_id;
+
if (unlikely(process_io_rotate(mi, &rev)))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
@@ -5710,7 +5848,7 @@ err:
mysql_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
if (error)
- mi->report(ERROR_LEVEL, error, ER(error),
+ mi->report(ERROR_LEVEL, error, NULL, ER(error),
(error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
"could not queue event from master" :
error_msg.ptr());
@@ -5817,7 +5955,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
#ifndef DBUG_OFF
mi->events_till_disconnect = disconnect_slave_event_count;
#endif
- ulong client_flag= CLIENT_REMEMBER_OPTIONS;
+ ulong client_flag= 0;
if (opt_slave_compressed_protocol)
client_flag=CLIENT_COMPRESS; /* We will use compression */
@@ -5855,7 +5993,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
/* we disallow empty users */
if (mi->user == NULL || mi->user[0] == 0)
{
- mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER(ER_SLAVE_FATAL_ERROR),
"Invalid (empty) username when attempting to "
"connect to the master server. Connection attempt "
@@ -5872,7 +6010,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
{
last_errno=mysql_errno(mysql);
suppress_warnings= 0;
- mi->report(ERROR_LEVEL, last_errno,
+ mi->report(ERROR_LEVEL, last_errno, NULL,
"error %s to master '%s@%s:%d'"
" - retry-time: %d retries: %lu message: %s",
(reconnect ? "reconnecting" : "connecting"),
@@ -6404,6 +6542,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
DBUG_ASSERT(rli->cur_log_fd >= 0);
mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
+ rli->last_inuse_relaylog->completed= true;
if (relay_log_purge)
{
@@ -6532,6 +6671,12 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
mysql_mutex_unlock(log_lock);
goto err;
}
+ if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
+ {
+ if (!hot_log)
+ mysql_mutex_unlock(log_lock);
+ goto err;
+ }
if (!hot_log)
mysql_mutex_unlock(log_lock);
continue;
@@ -6547,6 +6692,8 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
&errmsg)) <0)
goto err;
+ if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
+ goto err;
}
else
{
@@ -6685,7 +6832,7 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report,
" so slave stops; check error log on slave"
" for more info", MYF(0), bug_id);
// a verbose message for the error log
- rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
+ rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, NULL,
"According to the master's version ('%s'),"
" it is probable that master suffers from this bug:"
" http://bugs.mysql.com/bug.php?id=%u"