summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc281
1 files changed, 171 insertions, 110 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index f75e867cc3b..54be2f07b3d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1,5 +1,5 @@
/* Copyright (c) 2000, 2017, Oracle and/or its affiliates.
- Copyright (c) 2009, 2020, MariaDB Corporation.
+ Copyright (c) 2009, 2021, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -171,7 +171,7 @@ static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
static int safe_reconnect(THD*, MYSQL*, Master_info*, bool);
static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool);
static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size);
-static int queue_event(Master_info* mi,const char* buf,ulong event_len);
+static int queue_event(Master_info *mi,const uchar *buf, ulong event_len);
static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *,
volatile uint *, bool);
static bool check_io_slave_killed(Master_info *mi, const char *info);
@@ -315,9 +315,11 @@ build_gtid_pos_create_query(THD *thd, String *query,
LEX_CSTRING *engine_name)
{
bool err= false;
- err|= query->append(gtid_pos_table_definition1);
+ err|= query->append(gtid_pos_table_definition1,
+ sizeof(gtid_pos_table_definition1)-1);
err|= append_identifier(thd, query, table_name);
- err|= query->append(gtid_pos_table_definition2);
+ err|= query->append(gtid_pos_table_definition2,
+ sizeof(gtid_pos_table_definition2)-1);
err|= append_identifier(thd, query, engine_name);
return err;
}
@@ -348,8 +350,7 @@ gtid_pos_table_creation(THD *thd, plugin_ref engine, LEX_CSTRING *table_name)
err= parser_state.init(thd, thd->query(), thd->query_length());
if (err)
goto end;
- mysql_parse(thd, thd->query(), thd->query_length(), &parser_state,
- FALSE, FALSE);
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
if (unlikely(thd->is_error()))
err= 1;
/* The warning is relevant to 10.3 and earlier. */
@@ -1767,7 +1768,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
{
errmsg= err_buff2;
snprintf(err_buff2, sizeof(err_buff2),
- "Master reported unrecognized MySQL version: %s",
+ "Master reported unrecognized MariaDB version: %s",
mysql->server_version);
err_code= ER_SLAVE_FATAL_ERROR;
sprintf(err_buff, ER_DEFAULT(err_code), err_buff2);
@@ -1783,7 +1784,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
case 2:
errmsg= err_buff2;
snprintf(err_buff2, sizeof(err_buff2),
- "Master reported unrecognized MySQL version: %s",
+ "Master reported unrecognized MariaDB version: %s",
mysql->server_version);
err_code= ER_SLAVE_FATAL_ERROR;
sprintf(err_buff, ER_DEFAULT(err_code), err_buff2);
@@ -1956,7 +1957,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
!mi->rli.replicate_same_server_id)
{
errmsg= "The slave I/O thread stops because master and slave have equal \
-MySQL server ids; these ids must be different for replication to work (or \
+MariaDB server ids; these ids must be different for replication to work (or \
the --replicate-same-server-id option must be used on slave but this does \
not always make sense; please check the manual before using it).";
err_code= ER_SLAVE_FATAL_ERROR;
@@ -2029,7 +2030,8 @@ maybe it is a *VERY OLD MASTER*.");
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
- if (strcmp(master_row[0], global_system_variables.collation_server->name))
+ if (strcmp(master_row[0],
+ global_system_variables.collation_server->coll_name.str))
{
errmsg= "The slave I/O thread stops because master and slave have \
different values for the COLLATION_SERVER global variable. The values must \
@@ -2116,7 +2118,7 @@ be equal for the Statement-format replication to work";
/* We use ERROR_LEVEL to get the error logged to file */
mi->report(ERROR_LEVEL, err_code, NULL,
- "MySQL master doesn't have a TIME_ZONE variable. Note that"
+ "MariaDB master doesn't have a TIME_ZONE variable. Note that"
"if your timezone is not same between master and slave, your "
"slave may get wrong data into timestamp columns");
}
@@ -2541,15 +2543,20 @@ after_set_capability:
char quote_buf[2*sizeof(mi->master_log_name)+1];
char str_buf[28+2*sizeof(mi->master_log_name)+10];
String query(str_buf, sizeof(str_buf), system_charset_info);
+ size_t quote_length;
+ my_bool overflow;
query.length(0);
- query.append("SELECT binlog_gtid_pos('");
- escape_quotes_for_mysql(&my_charset_bin, quote_buf, sizeof(quote_buf),
- mi->master_log_name, strlen(mi->master_log_name));
- query.append(quote_buf);
- query.append("',");
+ query.append(STRING_WITH_LEN("SELECT binlog_gtid_pos('"));
+ quote_length= escape_quotes_for_mysql(&my_charset_bin, quote_buf,
+ sizeof(quote_buf),
+ mi->master_log_name,
+ strlen(mi->master_log_name),
+ &overflow);
+ query.append(quote_buf, quote_length);
+ query.append(STRING_WITH_LEN("',"));
query.append_ulonglong(mi->master_log_pos);
- query.append(")");
+ query.append(')');
if (!mysql_real_query(mysql, query.c_ptr_safe(), query.length()) &&
(master_res= mysql_store_result(mysql)) &&
@@ -3115,7 +3122,20 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list,
}
/* Text for Slave_IO_Running */
-static const char *slave_running[]= { "No", "Connecting", "Preparing", "Yes" };
+static const LEX_CSTRING slave_running[]=
+{
+ { STRING_WITH_LEN("No") },
+ { STRING_WITH_LEN("Connecting") },
+ { STRING_WITH_LEN("Preparing") },
+ { STRING_WITH_LEN("Yes") }
+};
+
+static const LEX_CSTRING msg_yes= { STRING_WITH_LEN("Yes") };
+static const LEX_CSTRING msg_no= { STRING_WITH_LEN("No") };
+#ifndef HAVE_OPENSSL
+static const LEX_CSTRING msg_ignored= { STRING_WITH_LEN("Ignored") };
+#endif
+
static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
String *gtid_pos)
@@ -3129,6 +3149,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
Protocol *protocol= thd->protocol;
Rpl_filter *rpl_filter= mi->rpl_filter;
StringBuffer<256> tmp;
+ const char *msg;
protocol->prepare_for_resend();
@@ -3146,11 +3167,13 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
Show what the sql driver replication thread is doing
This is only meaningful if there is only one slave thread.
*/
- protocol->store(mi->rli.sql_driver_thd ?
- mi->rli.sql_driver_thd->get_proc_info() : "",
- &my_charset_bin);
+ msg= (mi->rli.sql_driver_thd ?
+ mi->rli.sql_driver_thd->get_proc_info() : "");
+ protocol->store_string_or_null(msg, &my_charset_bin);
}
- protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin);
+ msg= mi->io_thd ? mi->io_thd->get_proc_info() : "";
+ protocol->store_string_or_null(msg, &my_charset_bin);
+
mysql_mutex_unlock(&mi->run_lock);
mysql_mutex_lock(&mi->data_lock);
@@ -3159,19 +3182,22 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
mysql_mutex_lock(&mi->err_lock);
/* err_lock is to protect mi->rli.last_error() */
mysql_mutex_lock(&mi->rli.err_lock);
- protocol->store(mi->host, &my_charset_bin);
- protocol->store(mi->user, &my_charset_bin);
+ protocol->store_string_or_null(mi->host, &my_charset_bin);
+ protocol->store_string_or_null(mi->user, &my_charset_bin);
protocol->store((uint32) mi->port);
protocol->store((uint32) mi->connect_retry);
- protocol->store(mi->master_log_name, &my_charset_bin);
- protocol->store((ulonglong) mi->master_log_pos);
- protocol->store(mi->rli.group_relay_log_name +
- dirname_length(mi->rli.group_relay_log_name),
+ protocol->store(mi->master_log_name, strlen(mi->master_log_name),
&my_charset_bin);
+ protocol->store((ulonglong) mi->master_log_pos);
+ msg= (mi->rli.group_relay_log_name +
+ dirname_length(mi->rli.group_relay_log_name));
+ protocol->store(msg, strlen(msg), &my_charset_bin);
protocol->store((ulonglong) mi->rli.group_relay_log_pos);
- protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
- protocol->store(slave_running[mi->slave_running], &my_charset_bin);
- protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
+ protocol->store(mi->rli.group_master_log_name,
+ strlen(mi->rli.group_master_log_name),
+ &my_charset_bin);
+ protocol->store(&slave_running[mi->slave_running], &my_charset_bin);
+ protocol->store(mi->rli.slave_running ? &msg_yes : &msg_no, &my_charset_bin);
protocol->store(rpl_filter->get_do_db());
protocol->store(rpl_filter->get_ignore_db());
@@ -3185,29 +3211,30 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(&tmp);
protocol->store(mi->rli.last_error().number);
- protocol->store(mi->rli.last_error().message, &my_charset_bin);
+ protocol->store_string_or_null(mi->rli.last_error().message,
+ &my_charset_bin);
protocol->store((uint32) mi->rli.slave_skip_counter);
protocol->store((ulonglong) mi->rli.group_master_log_pos);
protocol->store((ulonglong) mi->rli.log_space_total);
- protocol->store(
- mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
- ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
- ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay":
- "Gtid")), &my_charset_bin);
- protocol->store(mi->rli.until_log_name, &my_charset_bin);
+ msg= (mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None" :
+ (mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
+ (mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay":
+ "Gtid")));
+ protocol->store(msg, strlen(msg), &my_charset_bin);
+ protocol->store_string_or_null(mi->rli.until_log_name, &my_charset_bin);
protocol->store((ulonglong) mi->rli.until_log_pos);
#ifdef HAVE_OPENSSL
- protocol->store(mi->ssl? "Yes":"No", &my_charset_bin);
+ protocol->store(mi->ssl ? &msg_yes : &msg_no, &my_charset_bin);
#else
- protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
+ protocol->store(mi->ssl ? &msg_ignored: &msg_no, &my_charset_bin);
#endif
- protocol->store(mi->ssl_ca, &my_charset_bin);
- protocol->store(mi->ssl_capath, &my_charset_bin);
- protocol->store(mi->ssl_cert, &my_charset_bin);
- protocol->store(mi->ssl_cipher, &my_charset_bin);
- protocol->store(mi->ssl_key, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_ca, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_capath, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_cert, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_cipher, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_key, &my_charset_bin);
/*
Seconds_Behind_Master: if SQL thread is running and I/O thread is
@@ -3262,27 +3289,30 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
{
protocol->store_null();
}
- protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
+ protocol->store(mi->ssl_verify_server_cert? &msg_yes : &msg_no,
+ &my_charset_bin);
// Last_IO_Errno
protocol->store(mi->last_error().number);
// Last_IO_Error
- protocol->store(mi->last_error().message, &my_charset_bin);
+ protocol->store_string_or_null(mi->last_error().message, &my_charset_bin);
// Last_SQL_Errno
protocol->store(mi->rli.last_error().number);
// Last_SQL_Error
- protocol->store(mi->rli.last_error().message, &my_charset_bin);
+ protocol->store_string_or_null(mi->rli.last_error().message,
+ &my_charset_bin);
// Replicate_Ignore_Server_Ids
prot_store_ids(thd, &mi->ignore_server_ids);
// Master_Server_id
protocol->store((uint32) mi->master_id);
// SQL_Delay
// Master_Ssl_Crl
- protocol->store(mi->ssl_crl, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_crl, &my_charset_bin);
// Master_Ssl_Crlpath
- protocol->store(mi->ssl_crlpath, &my_charset_bin);
+ protocol->store_string_or_null(mi->ssl_crlpath, &my_charset_bin);
// Using_Gtid
- protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin);
+ protocol->store_string_or_null(mi->using_gtid_astext(mi->using_gtid),
+ &my_charset_bin);
// Gtid_IO_Pos
{
mi->gtid_current_pos.to_string(&tmp);
@@ -3313,7 +3343,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
else
protocol->store_null();
// Slave_SQL_Running_State
- protocol->store(slave_sql_running_state, &my_charset_bin);
+ protocol->store_string_or_null(slave_sql_running_state, &my_charset_bin);
protocol->store(mi->total_ddl_groups);
protocol->store(mi->total_non_trans_groups);
@@ -4300,6 +4330,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
DBUG_RETURN(1);
}
+ rli->last_seen_gtid= serial_rgi->current_gtid;
+ rli->last_trans_retry_count= serial_rgi->trans_retries;
if (opt_gtid_ignore_duplicates &&
rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
@@ -4450,7 +4482,7 @@ Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
binary log), the slave's relay log is corrupted (you can check this by running \
'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
-or slave's MySQL code. If you want to check the master's binary log or slave's \
+or slave's MariaDB code. If you want to check the master's binary log or slave's \
relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
on this slave.\
");
@@ -4773,6 +4805,8 @@ connected:
thd->set_command(COM_SLAVE_IO);
while (!io_slave_killed(mi))
{
+ const uchar *event_buf;
+
THD_STAGE_INFO(thd, stage_requesting_binlog_dump);
if (request_dump(thd, mysql, mi, &suppress_warnings))
{
@@ -4784,8 +4818,6 @@ connected:
goto connected;
}
- const char *event_buf;
-
mi->slave_running= MYSQL_SLAVE_RUN_READING;
DBUG_ASSERT(mi->last_error().number == 0);
ulonglong lastchecktime = my_hrtime().val;
@@ -4837,10 +4869,11 @@ Stopping slave I/O thread due to out-of-memory error from master");
retry_count=0; // ok event, reset retry counter
THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log);
- event_buf= (const char*)mysql->net.read_pos + 1;
+ event_buf= mysql->net.read_pos + 1;
mi->semi_ack= 0;
if (repl_semisync_slave.
- slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len,
+ slave_read_sync_header((const uchar*) mysql->net.read_pos + 1,
+ event_len,
&(mi->semi_ack), &event_buf, &event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
@@ -4962,20 +4995,17 @@ log space");
err:
// print the current replication position
if (mi->using_gtid == Master_info::USE_GTID_NO)
- {
sql_print_information("Slave I/O thread exiting, read up to log '%s', "
- "position %llu", IO_RPL_LOG_NAME, mi->master_log_pos);
- sql_print_information("master was %s:%d", mi->host, mi->port);
- }
+ "position %llu, master %s:%d", IO_RPL_LOG_NAME, mi->master_log_pos,
+ mi->host, mi->port);
else
{
StringBuffer<100> tmp;
mi->gtid_current_pos.to_string(&tmp);
sql_print_information("Slave I/O thread exiting, read up to log '%s', "
- "position %llu; GTID position %s",
+ "position %llu; GTID position %s, master %s:%d",
IO_RPL_LOG_NAME, mi->master_log_pos,
- tmp.c_ptr_safe());
- sql_print_information("master was %s:%d", mi->host, mi->port);
+ tmp.c_ptr_safe(), mi->host, mi->port);
}
repl_semisync_slave.slave_stop(mi);
thd->reset_query();
@@ -5321,6 +5351,7 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false;
+ rli->last_seen_gtid= serial_rgi->current_gtid;
if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() &&
rli->restart_gtid_pos.count() > 0)
{
@@ -5578,9 +5609,9 @@ pthread_handler_t handle_slave_sql(void *arg)
tmp.append(STRING_WITH_LEN("'"));
}
sql_print_information("Slave SQL thread exiting, replication stopped in "
- "log '%s' at position %llu%s", RPL_LOG_NAME,
- rli->group_master_log_pos, tmp.c_ptr_safe());
- sql_print_information("master was %s:%d", mi->host, mi->port);
+ "log '%s' at position %llu%s, master: %s:%d", RPL_LOG_NAME,
+ rli->group_master_log_pos, tmp.c_ptr_safe(),
+ mi->host, mi->port);
}
#ifdef WITH_WSREP
wsrep_after_command_before_result(thd);
@@ -5923,13 +5954,13 @@ static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
Reads a 3.23 event and converts it to the slave's format. This code was
copied from MySQL 4.0.
*/
-static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
- ulong event_len)
+static int queue_binlog_ver_1_event(Master_info *mi, const uchar *buf,
+ ulong event_len)
{
const char *errmsg = 0;
ulong inc_pos;
bool ignore_event= 0;
- char *tmp_buf = 0;
+ uchar *tmp_buf = 0;
Relay_log_info *rli= &mi->rli;
DBUG_ENTER("queue_binlog_ver_1_event");
@@ -5939,8 +5970,8 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
*/
if ((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
{
- if (unlikely(!(tmp_buf=(char*)my_malloc(key_memory_binlog_ver_1_event,
- event_len+1,MYF(MY_WME)))))
+ if (unlikely(!(tmp_buf= (uchar*) my_malloc(key_memory_binlog_ver_1_event,
+ event_len+1, MYF(MY_WME)))))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed");
@@ -5956,7 +5987,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
*/
tmp_buf[event_len++]=0;
int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
- buf = (const char*)tmp_buf;
+ buf= tmp_buf;
}
/*
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
@@ -6043,8 +6074,8 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
Reads a 4.0 event and converts it to the slave's format. This code was copied
from queue_binlog_ver_1_event(), with some affordable simplifications.
*/
-static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
- ulong event_len)
+static int queue_binlog_ver_3_event(Master_info *mi, const uchar *buf,
+ ulong event_len)
{
const char *errmsg = 0;
ulong inc_pos;
@@ -6054,7 +6085,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
/* read_log_event() will adjust log_pos to be end_log_pos */
Log_event *ev=
- Log_event::read_log_event(buf,event_len, &errmsg,
+ Log_event::read_log_event(buf, event_len, &errmsg,
mi->rli.relay_log.description_event_for_queue, 0);
if (unlikely(!ev))
{
@@ -6109,13 +6140,11 @@ err:
setup with 3.23 master or 4.0 master
*/
-static int queue_old_event(Master_info *mi, const char *buf,
- ulong event_len)
+static int queue_old_event(Master_info *mi, const uchar *buf, ulong event_len)
{
DBUG_ENTER("queue_old_event");
- switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
- {
+ switch (mi->rli.relay_log.description_event_for_queue->binlog_version) {
case 1:
DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len));
case 3:
@@ -6137,7 +6166,7 @@ static int queue_old_event(Master_info *mi, const char *buf,
any >=5.0.0 format.
*/
-static int queue_event(Master_info* mi,const char* buf, ulong event_len)
+static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
{
int error= 0;
StringBuffer<1024> error_msg;
@@ -6152,11 +6181,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rpl_gtid event_gtid;
static uint dbug_rows_event_count __attribute__((unused))= 0;
bool is_compress_event = false;
- char* new_buf = NULL;
- char new_buf_arr[4096];
+ uchar *new_buf = NULL;
+ uchar new_buf_arr[4096];
bool is_malloc = false;
bool is_rows_event= false;
/*
+ The flag has replicate_same_server_id semantics and is raised to accept
+ a same-server-id event group by the gtid strict mode semisync slave.
+ Own server-id events can appear as result of this server crash-recovery:
+ the transaction was created on this server then being master, got replicated
+ elsewhere right before the crash before commit;
+ finally at recovery the transaction gets evicted from the server's binlog.
+ */
+ bool do_accept_own_server_id;
+ /*
FD_q must have been prepared for the first R_a event
inside get_master_version_and_clock()
Show-up of FD:s affects checksum_alg at once because
@@ -6166,8 +6204,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ?
mi->checksum_alg_before_fd : mi->rli.relay_log.relay_log_checksum_alg;
- char *save_buf= NULL; // needed for checksumming the fake Rotate event
- char rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN];
+ const uchar *save_buf= NULL; // needed for checksumming the fake Rotate event
+ uchar rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN];
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
@@ -6201,9 +6239,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
// Emulate the network corruption
DBUG_EXECUTE_IF("corrupt_queue_event",
- if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
{
- char *debug_event_buf_c = (char*) buf;
+ uchar *debug_event_buf_c= const_cast<uchar*>(buf);
int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
DBUG_PRINT("info", ("Corrupt the event at queue_event: byte on position %d", debug_cor_pos));
@@ -6211,15 +6249,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
);
- if (event_checksum_test((uchar *) buf, event_len, checksum_alg))
+ if (event_checksum_test((uchar*) buf, event_len, checksum_alg))
{
error= ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE;
unlock_data_lock= FALSE;
goto err;
}
+ DBUG_ASSERT(((uchar) buf[FLAGS_OFFSET] & LOG_EVENT_ACCEPT_OWN_F) == 0);
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
- (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
+ buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
#ifdef ENABLED_DEBUG_SYNC
@@ -6243,9 +6282,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
dbug_rows_event_count = 0;
};);
#endif
+ s_id= uint4korr(buf + SERVER_ID_OFFSET);
+
mysql_mutex_lock(&mi->data_lock);
- switch ((uchar)buf[EVENT_TYPE_OFFSET]) {
+ switch (buf[EVENT_TYPE_OFFSET]) {
case STOP_EVENT:
/*
We needn't write this event to the relay log. Indeed, it just indicates a
@@ -6380,7 +6421,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->rli.relay_log.relay_log_checksum_alg);
/* the first one */
DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF);
- save_buf= (char *) buf;
+ save_buf= buf;
buf= rot_buf;
}
else
@@ -6400,7 +6441,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->rli.relay_log.relay_log_checksum_alg);
/* the first one */
DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF);
- save_buf= (char *) buf;
+ save_buf= buf;
buf= rot_buf;
}
/*
@@ -6485,7 +6526,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error= ER_SLAVE_HEARTBEAT_FAILURE;
error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
- error_msg.append(hb.get_log_ident(), (uint) hb.get_ident_len());
+ error_msg.append((char*) hb.get_log_ident(), (uint) hb.get_ident_len());
error_msg.append(STRING_WITH_LEN(" log_pos "));
error_msg.append_ulonglong(hb.log_pos);
goto err;
@@ -6511,7 +6552,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error= ER_SLAVE_HEARTBEAT_FAILURE;
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
- error_msg.append(hb.get_log_ident(), (uint) hb.get_ident_len());
+ error_msg.append((char*) hb.get_log_ident(), (uint) hb.get_ident_len());
error_msg.append(STRING_WITH_LEN(" log_pos "));
error_msg.append_ulonglong(hb.log_pos);
goto err;
@@ -6753,7 +6794,7 @@ dbug_gtid_accept:
if (query_event_uncompress(rli->relay_log.description_event_for_queue,
checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
buf, event_len, new_buf_arr, sizeof(new_buf_arr),
- &is_malloc, (char **)&new_buf, &event_len))
+ &is_malloc, &new_buf, &event_len))
{
char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR;
@@ -6776,8 +6817,9 @@ dbug_gtid_accept:
{
if (row_log_event_uncompress(rli->relay_log.description_event_for_queue,
checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
- buf, event_len, new_buf_arr, sizeof(new_buf_arr),
- &is_malloc, (char **)&new_buf, &event_len))
+ buf, event_len, new_buf_arr,
+ sizeof(new_buf_arr),
+ &is_malloc, &new_buf, &event_len))
{
char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR;
@@ -6864,7 +6906,8 @@ dbug_gtid_accept:
{
if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
- Query_log_event::peek_is_commit_rollback(buf, event_len,
+ Query_log_event::peek_is_commit_rollback(buf,
+ event_len,
checksum_alg)))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
@@ -6922,6 +6965,10 @@ dbug_gtid_accept:
break;
}
+ do_accept_own_server_id= (s_id == global_system_variables.server_id
+ && rpl_semi_sync_slave_enabled && opt_gtid_strict_mode
+ && mi->using_gtid != Master_info::USE_GTID_NO);
+
/*
Integrity of Rows- event group check.
A sequence of Rows- events must end with STMT_END_F flagged one.
@@ -6967,7 +7014,6 @@ dbug_gtid_accept:
*/
mysql_mutex_lock(log_lock);
- s_id= uint4korr(buf + SERVER_ID_OFFSET);
/*
Write the event to the relay log, unless we reconnected in the middle
of an event group and now need to skip the initial part of the group that
@@ -7012,7 +7058,8 @@ dbug_gtid_accept:
}
else
if ((s_id == global_system_variables.server_id &&
- !mi->rli.replicate_same_server_id) ||
+ !(mi->rli.replicate_same_server_id ||
+ do_accept_own_server_id)) ||
event_that_should_be_ignored(buf) ||
/*
the following conjunction deals with IGNORE_SERVER_IDS, if set
@@ -7072,6 +7119,19 @@ dbug_gtid_accept:
}
else
{
+ if (do_accept_own_server_id)
+ {
+ int2store(const_cast<uchar*>(buf + FLAGS_OFFSET),
+ uint2korr(buf + FLAGS_OFFSET) | LOG_EVENT_ACCEPT_OWN_F);
+ if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
+ {
+ ha_checksum crc= 0;
+
+ crc= my_checksum(crc, (const uchar *) buf,
+ event_len - BINLOG_CHECKSUM_LEN);
+ int4store(&buf[event_len - BINLOG_CHECKSUM_LEN], crc);
+ }
+ }
if (likely(!rli->relay_log.write_event_buffer((uchar*)buf, event_len)))
{
mi->master_log_pos+= inc_pos;
@@ -7315,16 +7375,16 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
charset, then set client charset to 'latin1' (default client charset).
*/
if (is_supported_parser_charset(default_charset_info))
- mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->cs_name.str);
else
{
sql_print_information("'%s' can not be used as client character set. "
"'%s' will be used as default client character set "
"while connecting to master.",
- default_charset_info->csname,
- default_client_charset_info->csname);
+ default_charset_info->cs_name.str,
+ default_client_charset_info->cs_name.str);
mysql_options(mysql, MYSQL_SET_CHARSET_NAME,
- default_client_charset_info->csname);
+ default_client_charset_info->cs_name.str);
}
/* This one is not strictly needed but we have it here for completeness */
@@ -7471,7 +7531,8 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
}
#endif
- mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME,
+ default_charset_info->cs_name.str);
/* This one is not strictly needed but we have it here for completeness */
mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
@@ -8181,19 +8242,19 @@ bool rpl_master_erroneous_autoinc(THD *thd)
}
-static bool get_row_event_stmt_end(const char* buf,
+static bool get_row_event_stmt_end(const uchar *buf,
const Format_description_log_event *fdle)
{
uint8 const common_header_len= fdle->common_header_len;
Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
uint8 const post_header_len= fdle->post_header_len[event_type-1];
- const char *flag_start= buf + common_header_len;
+ const uchar *flag_start= buf + common_header_len;
/*
The term 4 below signifies that master is of 'an intermediate source', see
Rows_log_event::Rows_log_event.
*/
- flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET);
+ flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET);
return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0;
}
@@ -8218,8 +8279,8 @@ void Rows_event_tracker::reset()
well as the end-of-statement status of the last one.
*/
-void Rows_event_tracker::update(const char* file_name, my_off_t pos,
- const char* buf,
+void Rows_event_tracker::update(const char *file_name, my_off_t pos,
+ const uchar *buf,
const Format_description_log_event *fdle)
{
DBUG_ENTER("Rows_event_tracker::update");