diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 305 |
1 files changed, 293 insertions, 12 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 63b7ce715c9..b042d463b1e 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -937,6 +937,48 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) } /* + FD_q's (A) is set initially from RL's (A): FD_q.(A) := RL.(A). + It's necessary to adjust FD_q.(A) at this point because in the following + course FD_q is going to be dumped to RL. + Generally FD_q is derived from a received FD_m (roughly FD_q := FD_m) + in queue_event and the master's (A) is installed. + At one step with the assignment the Relay-Log's checksum alg is set to + a new value: RL.(A) := FD_q.(A). If the slave service is stopped + the last time assigned RL.(A) will be passed over to the restarting + service (to the current execution point). + RL.A is a "codec" to verify checksum in queue_event() almost all the time + the first fake Rotate event. + Starting from this point IO thread will executes the following checksum + warmup sequence of actions: + + FD_q.A := RL.A, + A_m^0 := master.@@global.binlog_checksum, + {queue_event(R_f): verifies(R_f, A_m^0)}, + {queue_event(FD_m): verifies(FD_m, FD_m.A), dump(FD_q), rotate(RL), + FD_q := FD_m, RL.A := FD_q.A)} + + See legends definition on MYSQL_BIN_LOG::relay_log_checksum_alg + docs lines (binlog.h). + In above A_m^0 - the value of master's + @@binlog_checksum determined in the upcoming handshake (stored in + mi->checksum_alg_before_fd). + + + After the warm-up sequence IO gets to "normal" checksum verification mode + to use RL.A in + + {queue_event(E_m): verifies(E_m, RL.A)} + + until it has received a new FD_m. + */ + mi->rli.relay_log.description_event_for_queue->checksum_alg= + mi->rli.relay_log.relay_log_checksum_alg; + + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg != + BINLOG_CHECKSUM_ALG_UNDEF); + DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg != + BINLOG_CHECKSUM_ALG_UNDEF); + /* Compare the master and slave's clock. Do not die if master's clock is unavailable (very old master not supporting UNIX_TIMESTAMP()?). */ @@ -1175,6 +1217,103 @@ when it try to get the value of TIME_ZONE global variable from master."; } } + /* + Querying if master is capable to checksum and notifying it about own + CRC-awareness. The master's side instant value of @@global.binlog_checksum + is stored in the dump thread's uservar area as well as cached locally + to become known in consensus by master and slave. + */ + DBUG_EXECUTE_IF("simulate_slave_unaware_checksum", + mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_OFF; + goto past_checksum;); + { + int rc; + const char query[]= "SET @master_binlog_checksum= @@global.binlog_checksum"; + master_res= NULL; + mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; //initially undefined + /* + @c checksum_alg_before_fd is queried from master in this block. + If master is old checksum-unaware the value stays undefined. + Once the first FD will be received its alg descriptor will replace + the being queried one. + */ + rc= mysql_real_query(mysql, query, strlen(query)); + if (rc != 0) + { + if (check_io_slave_killed(mi->io_thd, mi, NULL)) + goto slave_killed_err; + + if (mysql_errno(mysql) == ER_UNKNOWN_SYSTEM_VARIABLE) + { + // this is tolerable as OM -> NS is supported + mi->report(WARNING_LEVEL, mysql_errno(mysql), + "Notifying master by %s failed with " + "error: %s", query, mysql_error(mysql)); + } + else + { + if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), + "Notifying master by %s failed with " + "error: %s", query, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto network_err; + } + else + { + errmsg= "The slave I/O thread stops because a fatal error is encountered " + "when it tried to SET @master_binlog_checksum on master."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto err; + } + } + } + else + { + mysql_free_result(mysql_store_result(mysql)); + if (!mysql_real_query(mysql, + STRING_WITH_LEN("SELECT @master_binlog_checksum")) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res)) && + (master_row[0] != NULL)) + { + mi->checksum_alg_before_fd= (uint8) + find_type(master_row[0], &binlog_checksum_typelib, 1) - 1; + // valid outcome is either of + DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF || + mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32); + } + else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), + "Get master BINLOG_CHECKSUM failed with error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + errmsg= "The slave I/O thread stops because a fatal error is encountered " + "when it tried to SELECT @master_binlog_checksum."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto err; + } + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + } + +#ifndef DBUG_OFF +past_checksum: +#endif err: if (errmsg) { @@ -1191,6 +1330,11 @@ network_err: if (master_res) mysql_free_result(master_res); DBUG_RETURN(2); + +slave_killed_err: + if (master_res) + mysql_free_result(master_res); + DBUG_RETURN(2); } /* @@ -3384,10 +3528,15 @@ static int process_io_rotate(Master_info *mi, Rotate_log_event *rev) */ if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4) { + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + mi->rli.relay_log.relay_log_checksum_alg); + delete mi->rli.relay_log.description_event_for_queue; /* start from format 3 (MySQL 4.0) again */ mi->rli.relay_log.description_event_for_queue= new Format_description_log_event(3); + mi->rli.relay_log.description_event_for_queue->checksum_alg= + mi->rli.relay_log.relay_log_checksum_alg; } /* Rotate the relay log makes binlog format detection easier (at next slave @@ -3440,8 +3589,9 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, Append_block/Exec_load (the SQL thread needs the data, as that thread is not connected to the master). */ - Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg, - mi->rli.relay_log.description_event_for_queue); + Log_event *ev= + Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, 0); if (unlikely(!ev)) { sql_print_error("Read invalid event from master: '%s',\ @@ -3528,8 +3678,9 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf, DBUG_ENTER("queue_binlog_ver_3_event"); /* read_log_event() will adjust log_pos to be end_log_pos */ - Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg, - mi->rli.relay_log.description_event_for_queue); + Log_event *ev= + Log_event::read_log_event(buf,event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, 0); if (unlikely(!ev)) { sql_print_error("Read invalid event from master: '%s',\ @@ -3555,6 +3706,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf, inc_pos= event_len; break; } + if (unlikely(rli->relay_log.append(ev))) { delete ev; @@ -3616,7 +3768,68 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ulong inc_pos; Relay_log_info *rli= &mi->rli; pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); + bool unlock_data_lock= TRUE; + /* + FD_q must have been prepared for the first R_a event + inside get_master_version_and_clock() + Show-up of FD:s affects checksum_alg at once because + that changes FD_queue. + */ + uint8 checksum_alg= mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ? + mi->checksum_alg_before_fd : + mi->rli.relay_log.relay_log_checksum_alg; + + char *save_buf= NULL; // needed for checksumming the fake Rotate event + char rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN]; + + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + DBUG_ENTER("queue_event"); + /* + FD_queue checksum alg description does not apply in a case of + FD itself. The one carries both parts of the checksum data. + */ + if (buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) + { + checksum_alg= get_checksum_alg(buf, event_len); + } + else if (buf[EVENT_TYPE_OFFSET] == START_EVENT_V3) + { + // checksum behaviour is similar to the pre-checksum FD handling + mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; + mi->rli.relay_log.description_event_for_queue->checksum_alg= + mi->rli.relay_log.relay_log_checksum_alg= checksum_alg= + BINLOG_CHECKSUM_ALG_OFF; + } + + // does not hold always because of old binlog can work with NM + // DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + + // should hold unless manipulations with RL. Tests that do that + // will have to refine the clause. + DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg != + BINLOG_CHECKSUM_ALG_UNDEF); + + // Emulate the network corruption + DBUG_EXECUTE_IF("corrupt_queue_event", + if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + { + char *debug_event_buf_c = (char*) buf; + int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); + debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos]; + DBUG_PRINT("info", ("Corrupt the event at queue_event: byte on position %d", debug_cor_pos)); + DBUG_SET("-d,corrupt_queue_event"); + } + ); + + if (event_checksum_test((uchar *) buf, event_len, checksum_alg)) + { + error= ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE; + unlock_data_lock= FALSE; + goto err; + } LINT_INIT(inc_pos); @@ -3644,12 +3857,67 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) goto err; case ROTATE_EVENT: { - Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue); - if (unlikely(process_io_rotate(mi,&rev))) + Rotate_log_event rev(buf, checksum_alg != BINLOG_CHECKSUM_ALG_OFF ? + event_len - BINLOG_CHECKSUM_LEN : event_len, + mi->rli.relay_log.description_event_for_queue); + + if (unlikely(process_io_rotate(mi, &rev))) { error= 1; goto err; } + /* + Checksum special cases for the fake Rotate (R_f) event caused by the protocol + of events generation and serialization in RL where Rotate of master is + queued right next to FD of slave. + Since it's only FD that carries the alg desc of FD_s has to apply to R_m. + Two special rules apply only to the first R_f which comes in before any FD_m. + The 2nd R_f should be compatible with the FD_s that must have taken over + the last seen FD_m's (A). + + RSC_1: If OM \and fake Rotate \and slave is configured to + to compute checksum for its first FD event for RL + the fake Rotate gets checksummed here. + */ + if (uint4korr(&buf[0]) == 0 && checksum_alg == BINLOG_CHECKSUM_ALG_OFF && + mi->rli.relay_log.relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_OFF) + { + ha_checksum rot_crc= my_checksum(0L, NULL, 0); + event_len += BINLOG_CHECKSUM_LEN; + memcpy(rot_buf, buf, event_len - BINLOG_CHECKSUM_LEN); + int4store(&rot_buf[EVENT_LEN_OFFSET], + uint4korr(&rot_buf[EVENT_LEN_OFFSET]) + BINLOG_CHECKSUM_LEN); + rot_crc= my_checksum(rot_crc, (const uchar *) rot_buf, + event_len - BINLOG_CHECKSUM_LEN); + int4store(&rot_buf[event_len - BINLOG_CHECKSUM_LEN], rot_crc); + DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET])); + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + mi->rli.relay_log.relay_log_checksum_alg); + /* the first one */ + DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); + save_buf= (char *) buf; + buf= rot_buf; + } + else + /* + RSC_2: If NM \and fake Rotate \and slave does not compute checksum + the fake Rotate's checksum is stripped off before relay-logging. + */ + if (uint4korr(&buf[0]) == 0 && checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + mi->rli.relay_log.relay_log_checksum_alg == BINLOG_CHECKSUM_ALG_OFF) + { + event_len -= BINLOG_CHECKSUM_LEN; + memcpy(rot_buf, buf, event_len); + int4store(&rot_buf[EVENT_LEN_OFFSET], + uint4korr(&rot_buf[EVENT_LEN_OFFSET]) - BINLOG_CHECKSUM_LEN); + DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET])); + DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == + mi->rli.relay_log.relay_log_checksum_alg); + /* the first one */ + DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); + save_buf= (char *) buf; + buf= rot_buf; + } /* Now the I/O thread has just changed its mi->master_log_name, so incrementing mi->master_log_pos is nonsense. @@ -3670,15 +3938,24 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) */ Format_description_log_event* tmp; const char* errmsg; + // mark it as undefined that is irrelevant anymore + mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; if (!(tmp= (Format_description_log_event*) Log_event::read_log_event(buf, event_len, &errmsg, - mi->rli.relay_log.description_event_for_queue))) + mi->rli.relay_log.description_event_for_queue, + 1))) { error= 2; goto err; } delete mi->rli.relay_log.description_event_for_queue; mi->rli.relay_log.description_event_for_queue= tmp; + if (tmp->checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) + tmp->checksum_alg= BINLOG_CHECKSUM_ALG_OFF; + + /* installing new value of checksum Alg for relay log */ + mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg; + /* Though this does some conversion to the slave's format, this will preserve the master's binlog format version, and number of event types. @@ -3755,12 +4032,15 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) else error= 3; rli->ign_master_log_name_end[0]= 0; // last event is not ignored + if (save_buf != NULL) + buf= save_buf; } pthread_mutex_unlock(log_lock); err: - pthread_mutex_unlock(&mi->data_lock); + if (unlock_data_lock) + pthread_mutex_unlock(&mi->data_lock); DBUG_PRINT("info", ("error: %d", error)); DBUG_RETURN(error); } @@ -4150,8 +4430,9 @@ static Log_event* next_event(Relay_log_info* rli) But if the relay log is created by new_file(): then the solution is: MYSQL_BIN_LOG::open() will write the buffered description event. */ - if ((ev=Log_event::read_log_event(cur_log,0, - rli->relay_log.description_event_for_exec))) + if ((ev= Log_event::read_log_event(cur_log,0, + rli->relay_log.description_event_for_exec, + opt_slave_sql_verify_checksum))) { DBUG_ASSERT(thd==rli->sql_thd); @@ -4552,9 +4833,9 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report, {37426, { 5, 1, 0 }, { 5, 1, 26 } }, }; const uchar *master_ver= - rli->relay_log.description_event_for_exec->server_version_split; + rli->relay_log.description_event_for_exec->server_version_split.ver; - DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3); + DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split.ver) == 3); for (uint i= 0; i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++) |