diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 191 |
1 files changed, 179 insertions, 12 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 596f0f5c1e6..8b6ba0e44e5 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -28,6 +28,9 @@ my_bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif +extern TYPELIB binlog_checksum_typelib; + + /* fake_rotate_event() builds a fake (=which does not exist physically in any binlog) Rotate event, which contains the name of the binlog we are going to @@ -47,10 +50,21 @@ static int binlog_dump_count = 0; */ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, - ulonglong position, const char** errmsg) + ulonglong position, const char** errmsg, + uint8 checksum_alg_arg) { DBUG_ENTER("fake_rotate_event"); char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100]; + + /* + this Rotate is to be sent with checksum if and only if + slave's get_master_version_and_clock time handshake value + of master's @@global.binlog_checksum was TRUE + */ + + my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && + checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; + /* 'when' (the timestamp) is set to 0 so that slave could distinguish between real and fake Rotate events (if necessary) @@ -60,7 +74,8 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, char* p = log_file_name+dirname_length(log_file_name); uint ident_len = (uint) strlen(p); - ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN; + ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); @@ -71,7 +86,19 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, packet->append(header, sizeof(header)); int8store(buf+R_POS_OFFSET,position); packet->append(buf, ROTATE_HEADER_LEN); - packet->append(p,ident_len); + packet->append(p, ident_len); + + if (do_checksum) + { + char b[BINLOG_CHECKSUM_LEN]; + ha_checksum crc= my_checksum(0L, NULL, 0); + crc= my_checksum(crc, (uchar*)header, sizeof(header)); + crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN); + crc= my_checksum(crc, (uchar*)p, ident_len); + int4store(b, crc); + packet->append(b, sizeof(b)); + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { *errmsg = "failed on my_net_write()"; @@ -153,6 +180,86 @@ static int send_file(THD *thd) } +/** + Internal to mysql_binlog_send() routine that recalculates checksum for + a FD event (asserted) that needs additional arranment prior sending to slave. +*/ +inline void fix_checksum(String *packet, ulong ev_offset) +{ + /* recalculate the crc for this event */ + uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET); + ha_checksum crc= my_checksum(0L, NULL, 0); + DBUG_ASSERT(data_len == + LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN + + BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN); + crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len - + BINLOG_CHECKSUM_LEN); + int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc); +} + + +static user_var_entry * get_binlog_checksum_uservar(THD * thd) +{ + LEX_STRING name= { C_STRING_WITH_LEN("master_binlog_checksum")}; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry; +} + +/** + Function for calling in mysql_binlog_send + to check if slave initiated checksum-handshake. + + @param[in] thd THD to access a user variable + + @return TRUE if handshake took place, FALSE otherwise +*/ + +static bool is_slave_checksum_aware(THD * thd) +{ + DBUG_ENTER("is_slave_checksum_aware"); + user_var_entry *entry= get_binlog_checksum_uservar(thd); + DBUG_RETURN(entry? true : false); +} + +/** + Function for calling in mysql_binlog_send + to get the value of @@binlog_checksum of the master at + time of checksum-handshake. + + The value tells the master whether to compute or not, and the slave + to verify or not the first artificial Rotate event's checksum. + + @param[in] thd THD to access a user variable + + @return value of @@binlog_checksum alg according to + @c enum enum_binlog_checksum_alg +*/ + +static uint8 get_binlog_checksum_value_at_connect(THD * thd) +{ + uint8 ret; + + DBUG_ENTER("get_binlog_checksum_value_at_connect"); + user_var_entry *entry= get_binlog_checksum_uservar(thd); + if (!entry) + { + ret= BINLOG_CHECKSUM_ALG_UNDEF; + } + else + { + DBUG_ASSERT(entry->type == STRING_RESULT); + String str; + uint dummy_errors; + str.copy(entry->value, entry->length, &my_charset_bin, &my_charset_bin, + &dummy_errors); + ret= (uint8) find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1; + DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg + } + DBUG_RETURN(ret); +} + /* Adjust the position pointer in the binary log file for all running slaves @@ -327,6 +434,9 @@ Increase max_allowed_packet on master"; case LOG_READ_TRUNC: *errmsg = "binlog truncated in the middle of event"; break; + case LOG_READ_CHECKSUM_FAILURE: + *errmsg = "event read from binlog did not pass crc check"; + break; default: *errmsg = "unknown error reading log event on the master"; break; @@ -353,6 +463,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, NET* net = &thd->net; pthread_mutex_t *log_lock; bool binlog_can_be_corrupted= FALSE; + uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; + #ifndef DBUG_OFF int left_events = max_binlog_dump_events; #endif @@ -450,7 +562,8 @@ impossible position"; given that we want minimum modification of 4.0, we send the normal and fake Rotates. */ - if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg)) + if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg, + get_binlog_checksum_value_at_connect(current_thd))) { /* This error code is not perfect, as fake_rotate_event() does not @@ -480,8 +593,8 @@ impossible position"; Try to find a Format_description_log_event at the beginning of the binlog */ - if (!(error = Log_event::read_log_event(&log, packet, log_lock))) - { + if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0))) + { /* The packet has offsets equal to the normal offsets in a binlog event +1 (the first character is \0). @@ -491,6 +604,23 @@ impossible position"; (*packet)[EVENT_TYPE_OFFSET+1])); if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) { + current_checksum_alg= get_checksum_alg(packet->ptr() + 1, + packet->length() - 1); + DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + if (!is_slave_checksum_aware(thd) && + current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + errmsg= "Slave can not handle replication events with the checksum " + "that master is configured to log"; + sql_print_warning("Master is configured to log replication events " + "with checksum, but will not send such events to " + "slaves that cannot process them"); + goto err; + } binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F); (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; @@ -506,6 +636,12 @@ impossible position"; */ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ ST_CREATED_OFFSET+1, (ulong) 0); + + /* fix the checksum due to latest changes in header */ + if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + fix_checksum(packet, 1); + /* send it */ if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { @@ -544,7 +680,8 @@ impossible position"; while (!net->error && net->vio != 0 && !thd->killed) { - while (!(error = Log_event::read_log_event(&log, packet, log_lock))) + while (!(error = Log_event::read_log_event(&log, packet, log_lock, + current_checksum_alg))) { #ifndef DBUG_OFF if (max_binlog_dump_events && !left_events--) @@ -558,6 +695,23 @@ impossible position"; if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) { + current_checksum_alg= get_checksum_alg(packet->ptr() + 1, + packet->length() - 1); + DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + if (!is_slave_checksum_aware(thd) && + current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + errmsg= "Slave can not handle replication events with the checksum " + "that master is configured to log"; + sql_print_warning("Master is configured to log replication events " + "with checksum, but will not send such events to " + "slaves that cannot process them"); + goto err; + } binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F); (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; @@ -594,7 +748,8 @@ impossible position"; here we were reading binlog that was not closed properly (as a result of a crash ?). treat any corruption as EOF */ - if (binlog_can_be_corrupted && error != LOG_READ_MEM) + if (binlog_can_be_corrupted && + (error != LOG_READ_MEM && error != LOG_READ_CHECKSUM_FAILURE)) error=LOG_READ_EOF; /* TODO: now that we are logging the offset, check to make sure @@ -649,7 +804,8 @@ impossible position"; */ pthread_mutex_lock(log_lock); - switch (error= Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0)) { + switch (error= Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0, + current_checksum_alg)) { case 0: /* we read successfully, so we'll need to send it to the slave */ pthread_mutex_unlock(log_lock); @@ -750,7 +906,7 @@ impossible position"; */ if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 || fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE, - &errmsg)) + &errmsg, current_checksum_alg)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; @@ -1499,7 +1655,8 @@ bool mysql_show_binlog_events(THD* thd) This code will fail on a mixed relay log (one which has Format_desc then Rotate then Format_desc). */ - ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event); + ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event, + opt_master_verify_checksum); if (ev) { if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) @@ -1521,8 +1678,12 @@ bool mysql_show_binlog_events(THD* thd) for (event_count = 0; (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0, - description_event)); ) + description_event, + opt_master_verify_checksum)); ) { + if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) + description_event->checksum_alg= ev->checksum_alg; + if (event_count >= limit_start && ev->net_send(protocol, linfo.log_file_name, pos)) { @@ -1815,6 +1976,12 @@ static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retrie &slave_trans_retries); static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period); static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter"); +static sys_var_bool_ptr sys_master_verify_checksum(&vars, + "master_verify_checksum", + &opt_master_verify_checksum); +static sys_var_bool_ptr sys_slave_sql_verify_checksum(&vars, + "slave_sql_verify_checksum", + &opt_slave_sql_verify_checksum); bool sys_var_slave_skip_counter::check(THD *thd, set_var *var) |