diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-09-04 12:22:09 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-09-04 12:22:09 +0200 |
commit | ada15c7a0f7947073664451c3804ab03723c657e (patch) | |
tree | 9a4366bccc5f87f1f8ad2700de1c4f5978ede620 | |
parent | 378bd0442a62d1067d19c67dddc3d8b27fc8a537 (diff) | |
download | mariadb-git-ada15c7a0f7947073664451c3804ab03723c657e.tar.gz |
Fix various places where code would work incorrectly if the common_header_len of events is different on master and slave
Patch developed with the help of Pavel Ivanov.
Also fix an uninitialised variable in queue_event().
-rw-r--r-- | sql/log.cc | 16 | ||||
-rw-r--r-- | sql/log.h | 1 | ||||
-rw-r--r-- | sql/log_event.cc | 29 | ||||
-rw-r--r-- | sql/log_event.h | 5 | ||||
-rw-r--r-- | sql/slave.cc | 35 | ||||
-rw-r--r-- | sql/sql_repl.cc | 72 |
6 files changed, 122 insertions, 36 deletions
diff --git a/sql/log.cc b/sql/log.cc index 041a0b555d1..f882cd5d64c 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -4806,12 +4806,23 @@ end: } -bool MYSQL_BIN_LOG::append(Log_event* ev) +bool +MYSQL_BIN_LOG::append(Log_event *ev) { - bool error = 0; + bool res; mysql_mutex_lock(&LOCK_log); + res= append_no_lock(ev); + mysql_mutex_unlock(&LOCK_log); + return res; +} + + +bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) +{ + bool error = 0; DBUG_ENTER("MYSQL_BIN_LOG::append"); + mysql_mutex_assert_owner(&LOCK_log); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); /* Log_event::write() is smart enough to use my_b_write() or @@ -4829,7 +4840,6 @@ bool MYSQL_BIN_LOG::append(Log_event* ev) if (my_b_append_tell(&log_file) > max_size) error= new_file_without_locking(); err: - mysql_mutex_unlock(&LOCK_log); signal_update(); // Safe as we don't call close DBUG_RETURN(error); } diff --git a/sql/log.h b/sql/log.h index 11b9cd289f7..051ee8ea068 100644 --- a/sql/log.h +++ b/sql/log.h @@ -712,6 +712,7 @@ public: */ bool appendv(const char* buf,uint len,...); bool append(Log_event* ev); + bool append_no_lock(Log_event* ev); void mark_xids_active(ulong cookie, uint xid_count); void mark_xid_done(ulong cookie, bool write_checkpoint); diff --git a/sql/log_event.cc b/sql/log_event.cc index 45e4379710a..51fa9d77267 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4754,16 +4754,15 @@ bool Format_description_log_event::write(IO_CACHE* file) We don't call Start_log_event_v3::write() because this would make 2 my_b_safe_write(). */ - uchar buff[FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN]; - size_t rec_size= sizeof(buff); + uchar buff[START_V3_HEADER_LEN+1]; + size_t rec_size= sizeof(buff) + BINLOG_CHECKSUM_ALG_DESC_LEN + + number_of_event_types; int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); if (!dont_set_created) created= get_time(); int4store(buff + ST_CREATED_OFFSET,created); buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN; - memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET + 1, (uchar*) post_header_len, - LOG_EVENT_TYPES); /* if checksum is requested record the checksum-algorithm descriptor next to @@ -4776,7 +4775,7 @@ bool Format_description_log_event::write(IO_CACHE* file) #ifndef DBUG_OFF data_written= 0; // to prepare for need_checksum assert #endif - buff[FORMAT_DESCRIPTION_HEADER_LEN]= need_checksum() ? + uchar checksum_byte= need_checksum() ? checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF; /* FD of checksum-aware server is always checksum-equipped, (V) is in, @@ -4796,7 +4795,10 @@ bool Format_description_log_event::write(IO_CACHE* file) checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway } ret= (write_header(file, rec_size) || - wrapper_my_b_safe_write(file, buff, rec_size) || + wrapper_my_b_safe_write(file, buff, sizeof(buff)) || + wrapper_my_b_safe_write(file, (uchar*)post_header_len, + number_of_event_types) || + wrapper_my_b_safe_write(file, &checksum_byte, sizeof(checksum_byte)) || write_footer(file)); if (no_checksum) checksum_alg= BINLOG_CHECKSUM_ALG_OFF; @@ -6125,7 +6127,7 @@ bool Gtid_log_event::peek(const char *event_start, size_t event_len, uint8 checksum_alg, uint32 *domain_id, uint32 *server_id, uint64 *seq_no, - uchar *flags2) + uchar *flags2, const Format_description_log_event *fdev) { const char *p; @@ -6140,10 +6142,10 @@ Gtid_log_event::peek(const char *event_start, size_t event_len, DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || checksum_alg == BINLOG_CHECKSUM_ALG_OFF); - if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN) return true; *server_id= uint4korr(event_start + SERVER_ID_OFFSET); - p= event_start + LOG_EVENT_HEADER_LEN; + p= event_start + fdev->common_header_len; *seq_no= uint8korr(p); p+= 8; *domain_id= uint4korr(p); @@ -6581,7 +6583,8 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) bool Gtid_list_log_event::peek(const char *event_start, uint32 event_len, uint8 checksum_alg, - rpl_gtid **out_gtid_list, uint32 *out_list_len) + rpl_gtid **out_gtid_list, uint32 *out_list_len, + const Format_description_log_event *fdev) { const char *p; uint32 count_field, count; @@ -6598,13 +6601,13 @@ Gtid_list_log_event::peek(const char *event_start, uint32 event_len, DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || checksum_alg == BINLOG_CHECKSUM_ALG_OFF); - if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN) + if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN) return true; - p= event_start + LOG_EVENT_HEADER_LEN; + p= event_start + fdev->common_header_len; count_field= uint4korr(p); p+= 4; count= count_field & ((1<<28)-1); - if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN + + if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN + 16 * count) return true; if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count + (count == 0), diff --git a/sql/log_event.h b/sql/log_event.h index abb3b96bac4..39a6ce24036 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3118,7 +3118,7 @@ public: static bool peek(const char *event_start, size_t event_len, uint8 checksum_alg, uint32 *domain_id, uint32 *server_id, uint64 *seq_no, - uchar *flags2); + uchar *flags2, const Format_description_log_event *fdev); #endif }; @@ -3232,7 +3232,8 @@ public: #endif static bool peek(const char *event_start, uint32 event_len, uint8 checksum_alg, - rpl_gtid **out_gtid_list, uint32 *out_list_len); + rpl_gtid **out_gtid_list, uint32 *out_list_len, + const Format_description_log_event *fdev); }; diff --git a/sql/slave.cc b/sql/slave.cc index 29514293c78..0a2fcb38b3d 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -4925,8 +4925,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) goto err; } - LINT_INIT(inc_pos); - if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) DBUG_RETURN(queue_old_event(mi,buf,event_len)); @@ -5182,7 +5180,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) if (Gtid_log_event::peek(buf, event_len, checksum_alg, &event_gtid.domain_id, &event_gtid.server_id, - &event_gtid.seq_no, &dummy_flag)) + &event_gtid.seq_no, &dummy_flag, + rli->relay_log.description_event_for_queue)) { error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; goto err; @@ -5240,15 +5239,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->gtid_current_pos.update(&mi->last_queued_gtid); mi->events_queued_since_last_gtid= 0; } - if (Gtid_log_event::peek(buf, event_len, checksum_alg, - &mi->last_queued_gtid.domain_id, - &mi->last_queued_gtid.server_id, - &mi->last_queued_gtid.seq_no, &dummy_flag)) - { - error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; - goto err; - } + mi->last_queued_gtid= event_gtid; ++mi->events_queued_since_last_gtid; + inc_pos= event_len; } break; @@ -5308,6 +5301,26 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) if (unlikely(gtid_skip_enqueue)) { mi->master_log_pos+= inc_pos; + if ((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT && + s_id == mi->master_id) + { + /* + If we write this master's description event in the middle of an event + group due to GTID reconnect, SQL thread will think that master crashed + in the middle of the group and roll back the first half, so we must not. + + But we still have to write an artificial copy of the masters description + event, to override the initial slave-version description event so that + SQL thread has the right information for parsing the events it reads. + */ + rli->relay_log.description_event_for_queue->created= 0; + rli->relay_log.description_event_for_queue->set_artificial_event(); + if (rli->relay_log.append_no_lock + (rli->relay_log.description_event_for_queue)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } } else if ((s_id == global_system_variables.server_id && diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 3e4939b7fa8..546a3dca98c 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1269,6 +1269,7 @@ gtid_state_from_pos(const char *name, uint32 offset, uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int err; String packet; + Format_description_log_event *fdev= NULL; if (gtid_state->load((const rpl_gtid *)NULL, 0)) { @@ -1280,6 +1281,13 @@ gtid_state_from_pos(const char *name, uint32 offset, if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1) return errormsg; + if (!(fdev= new Format_description_log_event(3))) + { + errormsg= "Out of memory initializing format_description event " + "while scanning binlog to find start position"; + goto end; + } + /* First we need to find the initial GTID_LIST_EVENT. We need this even if the offset is at the very start of the binlog file. @@ -1315,6 +1323,8 @@ gtid_state_from_pos(const char *name, uint32 offset, typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET]; if (typ == FORMAT_DESCRIPTION_EVENT) { + Format_description_log_event *tmp; + if (found_format_description_event) { errormsg= "Duplicate format description log event found while " @@ -1324,6 +1334,15 @@ gtid_state_from_pos(const char *name, uint32 offset, current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length()); found_format_description_event= true; + if (!(tmp= new Format_description_log_event(packet.ptr(), packet.length(), + fdev))) + { + errormsg= "Corrupt Format_description event found or out-of-memory " + "while searching for old-style position in binlog"; + goto end; + } + delete fdev; + fdev= tmp; } else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event) { @@ -1348,7 +1367,7 @@ gtid_state_from_pos(const char *name, uint32 offset, } status= Gtid_list_log_event::peek(packet.ptr(), packet.length(), current_checksum_alg, - >id_list, &list_len); + >id_list, &list_len, fdev); if (status) { errormsg= "Error reading Gtid_list_log_event while searching " @@ -1376,7 +1395,7 @@ gtid_state_from_pos(const char *name, uint32 offset, uchar flags2; if (Gtid_log_event::peek(packet.ptr(), packet.length(), current_checksum_alg, >id.domain_id, - >id.server_id, >id.seq_no, &flags2)) + >id.server_id, >id.seq_no, &flags2, fdev)) { errormsg= "Corrupt gtid_log_event found while scanning binlog to find " "initial slave position"; @@ -1399,6 +1418,7 @@ gtid_state_from_pos(const char *name, uint32 offset, } end: + delete fdev; end_io_cache(&cache); mysql_file_close(file, MYF(MY_WME)); @@ -1502,7 +1522,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, enum_gtid_until_state *gtid_until_group, rpl_binlog_state *until_binlog_state, bool slave_gtid_strict_mode, rpl_gtid *error_gtid, - bool *send_fake_gtid_list) + bool *send_fake_gtid_list, + Format_description_log_event *fdev) { my_off_t pos; size_t len= packet->length(); @@ -1516,7 +1537,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, if (ev_offset > len || Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, current_checksum_alg, - >id_list, &list_len)) + >id_list, &list_len, fdev)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to read Gtid_list_log_event: corrupt binlog"; @@ -1545,7 +1566,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, current_checksum_alg, &event_gtid.domain_id, &event_gtid.server_id, - &event_gtid.seq_no, &flags2)) + &event_gtid.seq_no, &flags2, fdev)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to read Gtid_log_event: corrupt binlog"; @@ -1881,6 +1902,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; + Format_description_log_event *fdev= NULL; #ifndef DBUG_OFF int left_events = max_binlog_dump_events; @@ -1956,6 +1978,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, } #endif + if (!(fdev= new Format_description_log_event(3))) + { + errmsg= "Out of memory initializing format_description event"; + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; + } + if (!mysql_bin_log.is_open()) { errmsg = "Binary log is not open"; @@ -2119,6 +2148,8 @@ impossible position"; (*packet)[EVENT_TYPE_OFFSET+ev_offset])); if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT) { + Format_description_log_event *tmp; + current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, packet->length() - ev_offset); DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || @@ -2136,6 +2167,18 @@ impossible position"; "slaves that cannot process them"); goto err; } + + if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, + packet->length()-ev_offset, + fdev))) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + errmsg= "Corrupt Format_description event found or out-of-memory"; + goto err; + } + delete fdev; + fdev= tmp; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* mark that this event with "log_pos=0", so the slave @@ -2253,6 +2296,8 @@ impossible position"; #endif if (event_type == FORMAT_DESCRIPTION_EVENT) { + Format_description_log_event *tmp; + current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, packet->length() - ev_offset); DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || @@ -2271,6 +2316,17 @@ impossible position"; goto err; } + if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, + packet->length()-ev_offset, + fdev))) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + errmsg= "Corrupt Format_description event found or out-of-memory"; + goto err; + } + delete fdev; + fdev= tmp; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; } @@ -2295,7 +2351,7 @@ impossible position"; until_gtid_state, >id_until_group, &until_binlog_state, slave_gtid_strict_mode, &error_gtid, - &send_fake_gtid_list))) + &send_fake_gtid_list, fdev))) { errmsg= tmp_msg; goto err; @@ -2501,7 +2557,7 @@ impossible position"; >id_skip_group, until_gtid_state, >id_until_group, &until_binlog_state, slave_gtid_strict_mode, &error_gtid, - &send_fake_gtid_list))) + &send_fake_gtid_list, fdev))) { errmsg= tmp_msg; goto err; @@ -2599,6 +2655,7 @@ end: thd->current_linfo = 0; mysql_mutex_unlock(&LOCK_thread_count); thd->variables.max_allowed_packet= old_max_allowed_packet; + delete fdev; DBUG_VOID_RETURN; err: @@ -2674,6 +2731,7 @@ err: if (file >= 0) mysql_file_close(file, MYF(MY_WME)); thd->variables.max_allowed_packet= old_max_allowed_packet; + delete fdev; my_message(my_errno, error_text, MYF(0)); DBUG_VOID_RETURN; |