diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-05-28 15:39:56 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-05-28 15:39:56 +0200 |
commit | a0fd7382bc1e4a8ba0affd27d3034f445a4bb453 (patch) | |
tree | c8b4b1dabddbc47829a890d74025f7889e692425 /sql | |
parent | 08ce9bfe057b6cd31e7fbca4e4e9e48edde242fb (diff) | |
parent | ee2b7db3f88f6882022a8aa71b30043ed8b40792 (diff) | |
download | mariadb-git-a0fd7382bc1e4a8ba0affd27d3034f445a4bb453.tar.gz |
Merge 10.0-base -> 10.0
Diffstat (limited to 'sql')
-rw-r--r-- | sql/item_strfunc.cc | 5 | ||||
-rw-r--r-- | sql/lex.h | 5 | ||||
-rw-r--r-- | sql/log.cc | 133 | ||||
-rw-r--r-- | sql/log.h | 5 | ||||
-rw-r--r-- | sql/log_event.cc | 104 | ||||
-rw-r--r-- | sql/log_event.h | 28 | ||||
-rw-r--r-- | sql/multi_range_read.cc | 4 | ||||
-rw-r--r-- | sql/mysqld.cc | 13 | ||||
-rw-r--r-- | sql/mysqld.h | 5 | ||||
-rw-r--r-- | sql/opt_range.cc | 38 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 313 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 23 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 15 | ||||
-rw-r--r-- | sql/rpl_mi.h | 10 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 15 | ||||
-rw-r--r-- | sql/rpl_rli.h | 8 | ||||
-rw-r--r-- | sql/set_var.cc | 2 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 10 | ||||
-rw-r--r-- | sql/slave.cc | 152 | ||||
-rw-r--r-- | sql/sql_join_cache.cc | 4 | ||||
-rw-r--r-- | sql/sql_lex.h | 13 | ||||
-rw-r--r-- | sql/sql_repl.cc | 739 | ||||
-rw-r--r-- | sql/sql_repl.h | 2 | ||||
-rw-r--r-- | sql/sql_show.cc | 4 | ||||
-rw-r--r-- | sql/sql_statistics.cc | 2 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 41 | ||||
-rw-r--r-- | sql/sys_vars.cc | 111 | ||||
-rw-r--r-- | sql/sys_vars.h | 108 |
28 files changed, 1531 insertions, 381 deletions
diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc index e5c3c48b3bb..5dcd83681c6 100644 --- a/sql/item_strfunc.cc +++ b/sql/item_strfunc.cc @@ -3865,7 +3865,10 @@ bool Item_func_dyncol_create::fix_fields(THD *thd, Item **ref) vals= (DYNAMIC_COLUMN_VALUE *) alloc_root(thd->mem_root, sizeof(DYNAMIC_COLUMN_VALUE) * (arg_count / 2)); - for (i= 0; i + 1 < arg_count && args[i]->result_type() == INT_RESULT; i+= 2); + for (i= 0; + i + 1 < arg_count && args[i]->result_type() == INT_RESULT; + i+= 2) + ; if (i + 1 < arg_count) { names= TRUE; diff --git a/sql/lex.h b/sql/lex.h index c579ea84533..7edb1456e09 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -152,6 +152,7 @@ static SYMBOL symbols[] = { { "CROSS", SYM(CROSS)}, { "CUBE", SYM(CUBE_SYM)}, { "CURRENT_DATE", SYM(CURDATE)}, + { "CURRENT_POS", SYM(CURRENT_POS_SYM)}, { "CURRENT_TIME", SYM(CURTIME)}, { "CURRENT_TIMESTAMP", SYM(NOW_SYM)}, { "CURRENT_USER", SYM(CURRENT_USER)}, @@ -330,7 +331,7 @@ static SYMBOL symbols[] = { { "LOW_PRIORITY", SYM(LOW_PRIORITY)}, { "MASTER", SYM(MASTER_SYM)}, { "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)}, - { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)}, + { "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)}, { "MASTER_HOST", SYM(MASTER_HOST_SYM)}, { "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)}, { "MASTER_LOG_POS", SYM(MASTER_LOG_POS_SYM)}, @@ -347,6 +348,7 @@ static SYMBOL symbols[] = { { "MASTER_SSL_KEY", SYM(MASTER_SSL_KEY_SYM)}, { "MASTER_SSL_VERIFY_SERVER_CERT", SYM(MASTER_SSL_VERIFY_SERVER_CERT_SYM)}, { "MASTER_USER", SYM(MASTER_USER_SYM)}, + { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)}, { "MASTER_HEARTBEAT_PERIOD", SYM(MASTER_HEARTBEAT_PERIOD_SYM)}, { "MATCH", SYM(MATCH)}, { "MAX_CONNECTIONS_PER_HOUR", SYM(MAX_CONNECTIONS_PER_HOUR)}, @@ -518,6 +520,7 @@ static SYMBOL symbols[] = { { "SIMPLE", SYM(SIMPLE_SYM)}, { "SLAVE", SYM(SLAVE)}, { "SLAVES", SYM(SLAVES)}, + { "SLAVE_POS", SYM(SLAVE_POS_SYM)}, { "SLOW", SYM(SLOW)}, { "SNAPSHOT", SYM(SNAPSHOT_SYM)}, { "SMALLINT", SYM(SMALLINT)}, diff --git a/sql/log.cc b/sql/log.cc index 48c458c2607..6726be36e74 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -119,7 +119,6 @@ static MYSQL_BIN_LOG::xid_count_per_binlog * static bool start_binlog_background_thread(); - static rpl_binlog_state rpl_global_gtid_binlog_state; /** @@ -3007,6 +3006,14 @@ void MYSQL_BIN_LOG::cleanup() mysql_cond_destroy(&COND_binlog_background_thread); mysql_cond_destroy(&COND_binlog_background_thread_end); } + + /* + Free data for global binlog state. + We can't do that automaticly as we need to do this before + safemalloc is shut down + */ + if (!is_relay_log) + rpl_global_gtid_binlog_state.free(); DBUG_VOID_RETURN; } @@ -3286,7 +3293,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, there had been an entry (domain_id, server_id, 0). */ - Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state); + Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0); if (gl_ev.write(&log_file)) goto err; @@ -3847,9 +3854,6 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log) if (!is_relay_log) { rpl_global_gtid_binlog_state.reset(); - mysql_mutex_lock(&LOCK_gtid_counter); - global_gtid_counter= 0; - mysql_mutex_unlock(&LOCK_gtid_counter); } /* Start logging with a new file */ @@ -5373,9 +5377,11 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, bool is_transactional) { rpl_gtid gtid; - uint64 seq_no; + uint32 domain_id= thd->variables.gtid_domain_id; + uint32 server_id= thd->variables.server_id; + uint64 seq_no= thd->variables.gtid_seq_no; + int err; - seq_no= thd->variables.gtid_seq_no; /* Reset the session variable gtid_seq_no, to reduce the risk of accidentally producing a duplicate GTID. @@ -5383,34 +5389,36 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, thd->variables.gtid_seq_no= 0; if (seq_no != 0) { - /* - If we see a higher sequence number, use that one as the basis of any - later generated sequence numbers. - */ - bump_seq_no_counter_if_needed(seq_no); + /* Use the specified sequence number. */ + gtid.domain_id= domain_id; + gtid.server_id= server_id; + gtid.seq_no= seq_no; + mysql_mutex_lock(&LOCK_rpl_gtid_state); + err= rpl_global_gtid_binlog_state.update(>id, opt_gtid_strict_mode); + mysql_mutex_unlock(&LOCK_rpl_gtid_state); + if (err && thd->stmt_da->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER) + errno= ER_GTID_STRICT_OUT_OF_ORDER; } else { - mysql_mutex_lock(&LOCK_gtid_counter); - seq_no= ++global_gtid_counter; - mysql_mutex_unlock(&LOCK_gtid_counter); + /* Allocate the next sequence number for the GTID. */ + mysql_mutex_lock(&LOCK_rpl_gtid_state); + err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id, + server_id, >id); + mysql_mutex_unlock(&LOCK_rpl_gtid_state); + seq_no= gtid.seq_no; } - gtid.seq_no= seq_no; - gtid.domain_id= thd->variables.gtid_domain_id; + if (err) + return true; - Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone, + Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, LOG_EVENT_SUPPRESS_USE_F, is_transactional); - gtid.server_id= gtid_event.server_id; /* Write the event to the binary log. */ if (gtid_event.write(&mysql_bin_log.log_file)) return true; status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); - /* Update the replication state (last GTID in each replication domain). */ - mysql_mutex_lock(&LOCK_rpl_gtid_state); - rpl_global_gtid_binlog_state.update(>id); - mysql_mutex_unlock(&LOCK_rpl_gtid_state); return false; } @@ -5511,9 +5519,6 @@ end: end_io_cache(&cache); if (opened) mysql_file_close(file_no, MYF(0)); - /* Pick the next unused seq_no from the loaded binlog state. */ - bump_seq_no_counter_if_needed( - rpl_global_gtid_binlog_state.seq_no_from_state()); return err; } @@ -5527,6 +5532,18 @@ MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) bool +MYSQL_BIN_LOG::append_state_pos(String *str) +{ + bool err; + + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + err= rpl_global_gtid_binlog_state.append_pos(str); + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + return err; +} + + +bool MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id, rpl_gtid *out_gtid) { @@ -5543,33 +5560,44 @@ bool MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id, rpl_gtid *out_gtid) { - rpl_binlog_state::element *elem; - bool res; + rpl_gtid *found_gtid; + bool res= false; mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); - elem= (rpl_binlog_state::element *) - my_hash_search(&rpl_global_gtid_binlog_state.hash, - (const uchar *)&domain_id, 0); - if (elem) + if ((found_gtid= rpl_global_gtid_binlog_state.find_most_recent(domain_id))) { + *out_gtid= *found_gtid; res= true; - *out_gtid= *elem->last_gtid; } - else - res= false; mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); return res; } -void -MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no) +int +MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no) { - mysql_mutex_lock(&LOCK_gtid_counter); - if (global_gtid_counter < seq_no) - global_gtid_counter= seq_no; - mysql_mutex_unlock(&LOCK_gtid_counter); + int err; + + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + err= rpl_global_gtid_binlog_state.bump_seq_no_if_needed(domain_id, seq_no); + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + return err; +} + + +bool +MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no) +{ + bool err; + + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + err= rpl_global_gtid_binlog_state.check_strict_sequence(domain_id, server_id, + seq_no); + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + return err; } @@ -5642,7 +5670,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); prev_binlog_id= current_binlog_id; - write_gtid_event(thd, true, using_trans); + if (write_gtid_event(thd, true, using_trans)) + goto err; } else { @@ -6711,8 +6740,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); - current->error= write_transaction_or_stmt(current); - + if ((current->error= write_transaction_or_stmt(current))) + current->commit_errno= errno; strmake(cache_mngr->last_commit_pos_file, log_file_name, sizeof(cache_mngr->last_commit_pos_file)-1); commit_offset= my_b_write_tell(&log_file); @@ -8257,9 +8286,6 @@ int TC_LOG_BINLOG::open(const char *opt_name) error= recover(&log_info, log_name, &log, (Format_description_log_event *)ev); state_read= true; - /* Pick the next unused seq_no from the recovered binlog state. */ - bump_seq_no_counter_if_needed( - rpl_global_gtid_binlog_state.seq_no_from_state()); } else error= read_state_from_file(); @@ -8513,7 +8539,7 @@ binlog_background_thread(void *arg __attribute__((unused))) thd->store_globals(); /* - Load the slave replication GTID state from the mysql.rpl_slave_state + Load the slave replication GTID state from the mysql.gtid_slave_pos table. This is mostly so that we can start our seq_no counter from the highest @@ -8723,16 +8749,11 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, case GTID_LIST_EVENT: if (first_round) { - uint32 i; Gtid_list_log_event *glev= (Gtid_list_log_event *)ev; /* Initialise the binlog state from the Gtid_list event. */ - rpl_global_gtid_binlog_state.reset(); - for (i= 0; i < glev->count; ++i) - { - if (rpl_global_gtid_binlog_state.update(&(glev->list[i]))) - goto err2; - } + if (rpl_global_gtid_binlog_state.load(glev->list, glev->count)) + goto err2; } break; @@ -8746,7 +8767,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, gtid.domain_id= gev->domain_id; gtid.server_id= gev->server_id; gtid.seq_no= gev->seq_no; - if (rpl_global_gtid_binlog_state.update(>id)) + if (rpl_global_gtid_binlog_state.update(>id, false)) goto err2; } break; diff --git a/sql/log.h b/sql/log.h index bd20c8aee09..018ac64eff7 100644 --- a/sql/log.h +++ b/sql/log.h @@ -779,10 +779,13 @@ public: int read_state_from_file(); int write_state_to_file(); int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); + bool append_state_pos(String *str); bool find_in_binlog_state(uint32 domain_id, uint32 server_id, rpl_gtid *out_gtid); bool lookup_domain_in_binlog_state(uint32 domain_id, rpl_gtid *out_gtid); - void bump_seq_no_counter_if_needed(uint64 seq_no); + int bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no); + bool check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no); }; class Log_event_handler diff --git a/sql/log_event.cc b/sql/log_event.cc index 3eddd8bf2eb..fce0130e8dd 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4004,7 +4004,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; gtid= rli->current_gtid; - if (rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true)) + if (rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true, false)) { rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, "Error during COMMIT: failed to update GTID state in " @@ -6214,6 +6214,15 @@ Gtid_log_event::do_apply_event(Relay_log_info const *rli) thd->variables.gtid_domain_id= this->domain_id; thd->variables.gtid_seq_no= this->seq_no; + if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates) + { + /* Need to reset prior "ok" status to give an error. */ + thd->clear_error(); + thd->stmt_da->reset_diagnostics_area(); + if (mysql_bin_log.check_strict_gtid_sequence(this->domain_id, + this->server_id, this->seq_no)) + return 1; + } if (flags2 & FL_STANDALONE) return 0; @@ -6320,6 +6329,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, : Log_event(buf, description_event), count(0), list(0) { uint32 i; + uint32 val; uint8 header_size= description_event->common_header_len; uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1]; if (event_len < header_size + post_header_len || @@ -6327,7 +6337,9 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, return; buf+= header_size; - count= uint4korr(buf) & ((1<<28)-1); + val= uint4korr(buf); + count= val & ((1<<28)-1); + gl_flags= val & ((uint32)0xf << 28); buf+= 4; if (event_len - (header_size + post_header_len) < count*element_size || (!(list= (rpl_gtid *)my_malloc(count*sizeof(*list) + (count == 0), @@ -6348,8 +6360,9 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, #ifdef MYSQL_SERVER -Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) - : count(gtid_set->count()), list(0) +Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set, + uint32 gl_flags_) + : count(gtid_set->count()), gl_flags(gl_flags_), list(0) { cache_type= EVENT_NO_CACHE; /* Failure to allocate memory will be caught by is_valid() returning false. */ @@ -6359,32 +6372,73 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) gtid_set->get_gtid_list(list, count); } + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) bool -Gtid_list_log_event::write(IO_CACHE *file) +Gtid_list_log_event::to_packet(String *packet) { uint32 i; - uchar buf[element_size]; + uchar *p; + uint32 needed_length; DBUG_ASSERT(count < 1<<28); - if (write_header(file, get_data_size())) - return 1; - int4store(buf, count & ((1<<28)-1)); - if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN)) - return 1; + needed_length= packet->length() + get_data_size(); + if (packet->reserve(needed_length)) + return true; + p= (uchar *)packet->ptr() + packet->length();; + packet->length(needed_length); + int4store(p, (count & ((1<<28)-1)) | gl_flags); + p += 4; + /* Initialise the padding for empty Gtid_list. */ + if (count == 0) + int2store(p, 0); for (i= 0; i < count; ++i) { - int4store(buf, list[i].domain_id); - int4store(buf+4, list[i].server_id); - int8store(buf+8, list[i].seq_no); - if (wrapper_my_b_safe_write(file, buf, element_size)) - return 1; + int4store(p, list[i].domain_id); + int4store(p+4, list[i].server_id); + int8store(p+8, list[i].seq_no); + p += 16; } - return write_footer(file); + + return false; +} + + +bool +Gtid_list_log_event::write(IO_CACHE *file) +{ + char buf[128]; + String packet(buf, sizeof(buf), system_charset_info); + + packet.length(0); + if (to_packet(&packet)) + return true; + return + write_header(file, get_data_size()) || + wrapper_my_b_safe_write(file, (uchar *)packet.ptr(), packet.length()) || + write_footer(file); +} + + +int +Gtid_list_log_event::do_apply_event(Relay_log_info const *rli) +{ + int ret= Log_event::do_apply_event(rli); + if (rli->until_condition == Relay_log_info::UNTIL_GTID && + (gl_flags & FLAG_UNTIL_REACHED)) + { + char str_buf[128]; + String str(str_buf, sizeof(str_buf), system_charset_info); + const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str); + sql_print_information("Slave SQL thread stops because it reached its" + " UNTIL master_gtid_pos %s", str.c_ptr_safe()); + const_cast<Relay_log_info*>(rli)->abort_slave= true; + } + return ret; } -#ifdef HAVE_REPLICATION void Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol) { @@ -6439,12 +6493,24 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) */ bool Gtid_list_log_event::peek(const char *event_start, uint32 event_len, + uint8 checksum_alg, rpl_gtid **out_gtid_list, uint32 *out_list_len) { const char *p; uint32 count_field, count; rpl_gtid *gtid_list; + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + { + if (event_len > BINLOG_CHECKSUM_LEN) + event_len-= BINLOG_CHECKSUM_LEN; + else + event_len= 0; + } + else + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_OFF); + if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN) return true; p= event_start + LOG_EVENT_HEADER_LEN; @@ -6841,7 +6907,7 @@ int Xid_log_event::do_apply_event(Relay_log_info const *rli) const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; gtid= rli->current_gtid; - err= rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true); + err= rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true, false); if (err) { rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, diff --git a/sql/log_event.h b/sql/log_event.h index 5026b280b27..b5b488f320d 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3116,7 +3116,7 @@ public: <td>count</td> <td>4 byte unsigned integer</td> <td>The lower 28 bits are the number of GTIDs. The upper 4 bits are - reserved for flags bits for future expansion</td> + flags bits.</td> </tr> </table> @@ -3149,18 +3149,28 @@ public: </table> The three elements in the body repeat COUNT times to form the GTID list. + + At the time of writing, only one flag bit is in use. + + Bit 28 of `count' is used for flag FLAG_UNTIL_REACHED, which is sent in a + Gtid_list event from the master to the slave to indicate that the START + SLAVE UNTIL master_gtid_pos=xxx condition has been reached. (This flag is + only sent in "fake" events generated on the fly, it is not written into + the binlog). */ class Gtid_list_log_event: public Log_event { public: uint32 count; + uint32 gl_flags; struct rpl_gtid *list; static const uint element_size= 4+4+8; + static const uint32 FLAG_UNTIL_REACHED= (1<<28); #ifdef MYSQL_SERVER - Gtid_list_log_event(rpl_binlog_state *gtid_set); + Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags); #ifdef HAVE_REPLICATION void pack_info(THD *thd, Protocol *protocol); #endif @@ -3171,12 +3181,22 @@ public: const Format_description_log_event *description_event); ~Gtid_list_log_event() { my_free(list); } Log_event_type get_type_code() { return GTID_LIST_EVENT; } - int get_data_size() { return GTID_LIST_HEADER_LEN + count*element_size; } + int get_data_size() { + /* + Replacing with dummy event, needed for older slaves, requires a minimum + of 6 bytes in the body. + */ + return (count==0 ? + GTID_LIST_HEADER_LEN+2 : GTID_LIST_HEADER_LEN+count*element_size); + } bool is_valid() const { return list != NULL; } -#ifdef MYSQL_SERVER +#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) + bool to_packet(String *packet); bool write(IO_CACHE *file); + virtual int do_apply_event(Relay_log_info const *rli); #endif static bool peek(const char *event_start, uint32 event_len, + uint8 checksum_alg, rpl_gtid **out_gtid_list, uint32 *out_list_len); }; diff --git a/sql/multi_range_read.cc b/sql/multi_range_read.cc index b6133eac3ae..3ea65b16e3d 100644 --- a/sql/multi_range_read.cc +++ b/sql/multi_range_read.cc @@ -1199,9 +1199,9 @@ bool DsMrr_impl::setup_buffer_sharing(uint key_size_in_keybuf, statistics? */ uint parts= my_count_bits(key_tuple_map); - ulong rpc; + ha_rows rpc; ulonglong rowids_size= rowid_buf_elem_size; - if ((rpc= key_info->actual_rec_per_key(parts - 1))) + if ((rpc= (ha_rows) key_info->actual_rec_per_key(parts - 1))) rowids_size= rowid_buf_elem_size * rpc; double fraction_for_rowids= diff --git a/sql/mysqld.cc b/sql/mysqld.cc index fd8b990b6c3..0a49eb0c7ee 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -681,7 +681,7 @@ mysql_mutex_t mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, LOCK_global_table_stats, LOCK_global_index_stats; -mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state; +mysql_mutex_t LOCK_rpl_gtid_state; /** The below lock protects access to two global server variables: @@ -850,7 +850,7 @@ PSI_mutex_key key_LOCK_stats, key_LOCK_global_index_stats, key_LOCK_wakeup_ready; -PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state; +PSI_mutex_key key_LOCK_rpl_gtid_state; PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered; @@ -895,7 +895,6 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0}, - { &key_LOCK_gtid_counter, "LOCK_gtid_counter", PSI_FLAG_GLOBAL}, { &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL}, { &key_LOCK_thd_data, "THD::LOCK_thd_data", 0}, { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL}, @@ -1394,10 +1393,7 @@ struct st_VioSSLFd *ssl_acceptor_fd; */ uint connection_count= 0, extra_connection_count= 0; -/** - Running counter for generating new GTIDs locally. -*/ -uint64 global_gtid_counter= 0; +my_bool opt_gtid_strict_mode= FALSE; /* Function declarations */ @@ -2073,7 +2069,6 @@ static void clean_up_mutexes() mysql_mutex_destroy(&LOCK_global_user_client_stats); mysql_mutex_destroy(&LOCK_global_table_stats); mysql_mutex_destroy(&LOCK_global_index_stats); - mysql_mutex_destroy(&LOCK_gtid_counter); mysql_mutex_destroy(&LOCK_rpl_gtid_state); #ifdef HAVE_OPENSSL mysql_mutex_destroy(&LOCK_des_key_file); @@ -4250,8 +4245,6 @@ static int init_thread_environment() &LOCK_global_table_stats, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_global_index_stats, &LOCK_global_index_stats, MY_MUTEX_INIT_FAST); - mysql_mutex_init(key_LOCK_gtid_counter, - &LOCK_gtid_counter, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_rpl_gtid_state, &LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW); mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered, diff --git a/sql/mysqld.h b/sql/mysqld.h index 0717bcff718..e07b5d5c41c 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -255,7 +255,7 @@ extern PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_index_stats, key_LOCK_wakeup_ready; -extern PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state; +extern PSI_mutex_key key_LOCK_rpl_gtid_state; extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, @@ -476,7 +476,7 @@ extern mysql_mutex_t LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_global_system_variables, LOCK_user_conn, LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count; -extern mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state; +extern mysql_mutex_t LOCK_rpl_gtid_state; extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count; #ifdef HAVE_OPENSSL extern mysql_mutex_t LOCK_des_key_file; @@ -677,6 +677,7 @@ extern handlerton *maria_hton; extern uint extra_connection_count; extern uint64 global_gtid_counter; +extern my_bool opt_gtid_strict_mode; extern my_bool opt_userstat_running, debug_assert_if_crashed_table; extern uint mysqld_extra_port; extern ulong opt_progress_report_time; diff --git a/sql/opt_range.cc b/sql/opt_range.cc index f20e3ed7739..37a7944bbaf 100644 --- a/sql/opt_range.cc +++ b/sql/opt_range.cc @@ -5919,8 +5919,8 @@ ha_rows records_in_index_intersect_extension(PARTIAL_INDEX_INTERSECT_INFO *curr, ha_rows ext_records= ext_index_scan->records; if (i < used_key_parts) { - ulong f1= key_info->actual_rec_per_key(i-1); - ulong f2= key_info->actual_rec_per_key(i); + double f1= key_info->actual_rec_per_key(i-1); + double f2= key_info->actual_rec_per_key(i); ext_records= (ha_rows) ((double) ext_records / f2 * f1); } if (ext_records < table_cardinality) @@ -13157,11 +13157,11 @@ void cost_group_min_max(TABLE* table, KEY *index_info, uint used_key_parts, double *read_cost, ha_rows *records) { ha_rows table_records; - uint num_groups; - uint num_blocks; - uint keys_per_block; - uint keys_per_group; - uint keys_per_subgroup; /* Average number of keys in sub-groups */ + ha_rows num_groups; + ha_rows num_blocks; + uint keys_per_block; + ha_rows keys_per_group; + ha_rows keys_per_subgroup; /* Average number of keys in sub-groups */ /* formed by a key infix. */ double p_overlap; /* Probability that a sub-group overlaps two blocks. */ double quick_prefix_selectivity; @@ -13170,24 +13170,24 @@ void cost_group_min_max(TABLE* table, KEY *index_info, uint used_key_parts, DBUG_ENTER("cost_group_min_max"); table_records= table->stat_records(); - keys_per_block= (table->file->stats.block_size / 2 / - (index_info->key_length + table->file->ref_length) - + 1); - num_blocks= (uint)(table_records / keys_per_block) + 1; + keys_per_block= (uint) (table->file->stats.block_size / 2 / + (index_info->key_length + table->file->ref_length) + + 1); + num_blocks= (ha_rows)(table_records / keys_per_block) + 1; /* Compute the number of keys in a group. */ - keys_per_group= index_info->actual_rec_per_key(group_key_parts - 1); + keys_per_group= (ha_rows) index_info->actual_rec_per_key(group_key_parts - 1); if (keys_per_group == 0) /* If there is no statistics try to guess */ /* each group contains 10% of all records */ - keys_per_group= (uint)(table_records / 10) + 1; - num_groups= (uint)(table_records / keys_per_group) + 1; + keys_per_group= (table_records / 10) + 1; + num_groups= (table_records / keys_per_group) + 1; /* Apply the selectivity of the quick select for group prefixes. */ if (range_tree && (quick_prefix_records != HA_POS_ERROR)) { quick_prefix_selectivity= (double) quick_prefix_records / (double) table_records; - num_groups= (uint) rint(num_groups * quick_prefix_selectivity); + num_groups= (ha_rows) rint(num_groups * quick_prefix_selectivity); set_if_bigger(num_groups, 1); } @@ -13196,7 +13196,7 @@ void cost_group_min_max(TABLE* table, KEY *index_info, uint used_key_parts, Compute the probability that two ends of a subgroup are inside different blocks. */ - keys_per_subgroup= index_info->actual_rec_per_key(used_key_parts - 1); + keys_per_subgroup= (ha_rows) index_info->actual_rec_per_key(used_key_parts - 1); if (keys_per_subgroup >= keys_per_block) /* If a subgroup is bigger than */ p_overlap= 1.0; /* a block, it will overlap at least two blocks. */ else @@ -13224,9 +13224,9 @@ void cost_group_min_max(TABLE* table, KEY *index_info, uint used_key_parts, *records= num_groups; DBUG_PRINT("info", - ("table rows: %lu keys/block: %u keys/group: %u result rows: %lu blocks: %u", - (ulong)table_records, keys_per_block, keys_per_group, - (ulong) *records, num_blocks)); + ("table rows: %lu keys/block: %u keys/group: %lu result rows: %lu blocks: %lu", + (ulong)table_records, keys_per_block, (ulong) keys_per_group, + (ulong) *records, (ulong) num_blocks)); DBUG_VOID_RETURN; } diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index d6a6ed90bd3..b34b890060b 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -29,7 +29,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name= - { C_STRING_WITH_LEN("rpl_slave_state") }; + { C_STRING_WITH_LEN("gtid_slave_pos") }; void @@ -73,7 +73,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli) if ((sub_id= rli->gtid_sub_id)) { rli->gtid_sub_id= 0; - if (record_gtid(thd, &rli->current_gtid, sub_id, false)) + if (record_gtid(thd, &rli->current_gtid, sub_id, false, false)) return 1; update_state_hash(sub_id, &rli->current_gtid); } @@ -186,8 +186,6 @@ rpl_slave_state::truncate_state_table(THD *thd) int err= 0; TABLE *table; - mysql_reset_thd_for_next_command(thd, 0); - tlist.init_one_table(STRING_WITH_LEN("mysql"), rpl_gtid_slave_state_table_name.str, rpl_gtid_slave_state_table_name.length, @@ -234,7 +232,7 @@ static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= { static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1}; -static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= { +static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= { array_elements(mysql_rpl_slave_state_coltypes), mysql_rpl_slave_state_coltypes, array_elements(mysql_rpl_slave_state_pk_parts), @@ -256,14 +254,14 @@ protected: static Gtid_db_intact gtid_table_intact; /* - Check that the mysql.rpl_slave_state table has the correct definition. + Check that the mysql.gtid_slave_pos table has the correct definition. */ int gtid_check_rpl_slave_state_table(TABLE *table) { int err; - if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef))) + if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef))) my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql", rpl_gtid_slave_state_table_name.str); return err; @@ -286,7 +284,7 @@ gtid_check_rpl_slave_state_table(TABLE *table) */ int rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - bool in_transaction) + bool in_transaction, bool in_statement) { TABLE_LIST tlist; int err= 0; @@ -297,7 +295,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ulonglong thd_saved_option= thd->variables.option_bits; Query_tables_list lex_backup; - mysql_reset_thd_for_next_command(thd, 0); + if (!in_statement) + mysql_reset_thd_for_next_command(thd, 0); DBUG_EXECUTE_IF("gtid_inject_record_gtid", { @@ -371,7 +370,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, } table->file->ha_index_end(); - mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no); + if(!err && opt_bin_log && + (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id, + gtid->seq_no))) + my_error(ER_OUT_OF_RESOURCES, MYF(0)); end: @@ -626,7 +628,7 @@ gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid) */ int rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, - bool reset) + bool reset, bool in_statement) { char *end= state_from_master + len; @@ -645,7 +647,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_subid(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, false) || + record_gtid(thd, >id, sub_id, false, in_statement) || update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no)) return 1; if (state_from_master == end) @@ -686,6 +688,7 @@ rpl_binlog_state::rpl_binlog_state() sizeof(uint32), NULL, my_free, HASH_UNIQUE); mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, MY_MUTEX_INIT_SLOW); + initialized= 1; } @@ -699,11 +702,36 @@ rpl_binlog_state::reset() my_hash_reset(&hash); } -rpl_binlog_state::~rpl_binlog_state() +void rpl_binlog_state::free() +{ + if (initialized) + { + initialized= 0; + reset(); + my_hash_free(&hash); + mysql_mutex_destroy(&LOCK_binlog_state); + } +} + + +bool +rpl_binlog_state::load(struct rpl_gtid *list, uint32 count) { + uint32 i; + reset(); - my_hash_free(&hash); - mysql_mutex_destroy(&LOCK_binlog_state); + for (i= 0; i < count; ++i) + { + if (update(&(list[i]), false)) + return true; + } + return false; +} + + +rpl_binlog_state::~rpl_binlog_state() +{ + free(); } @@ -716,48 +744,111 @@ rpl_binlog_state::~rpl_binlog_state() Returns 0 for ok, 1 for error. */ int -rpl_binlog_state::update(const struct rpl_gtid *gtid) +rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict) { - rpl_gtid *lookup_gtid; element *elem; - elem= (element *)my_hash_search(&hash, (const uchar *)(>id->domain_id), 0); - if (elem) + if ((elem= (element *)my_hash_search(&hash, + (const uchar *)(>id->domain_id), 0))) { - /* - By far the most common case is that successive events within same - replication domain have the same server id (it changes only when - switching to a new master). So save a hash lookup in this case. - */ - if (likely(elem->last_gtid->server_id == gtid->server_id)) + if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no) { - elem->last_gtid->seq_no= gtid->seq_no; - return 0; + my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id, + gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id, + elem->last_gtid->server_id, elem->last_gtid->seq_no); + return 1; } + if (elem->seq_no_counter < gtid->seq_no) + elem->seq_no_counter= gtid->seq_no; + if (!elem->update_element(gtid)) + return 0; + } + else if (!alloc_element(gtid)) + return 0; - lookup_gtid= (rpl_gtid *) - my_hash_search(&elem->hash, (const uchar *)>id->server_id, 0); - if (lookup_gtid) - { - lookup_gtid->seq_no= gtid->seq_no; - elem->last_gtid= lookup_gtid; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; +} + + +/* + Fill in a new GTID, allocating next sequence number, and update state + accordingly. +*/ +int +rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id, + rpl_gtid *gtid) +{ + element *elem; + + gtid->domain_id= domain_id; + gtid->server_id= server_id; + + if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0))) + { + gtid->seq_no= ++elem->seq_no_counter; + if (!elem->update_element(gtid)) return 0; - } + } + else + { + gtid->seq_no= 1; + if (!alloc_element(gtid)) + return 0; + } - /* Allocate a new GTID and insert it. */ - lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); - if (!lookup_gtid) - return 1; - memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); - if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) - { - my_free(lookup_gtid); - return 1; - } - elem->last_gtid= lookup_gtid; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; +} + + +/* Helper functions for update. */ +int +rpl_binlog_state::element::update_element(const rpl_gtid *gtid) +{ + rpl_gtid *lookup_gtid; + + /* + By far the most common case is that successive events within same + replication domain have the same server id (it changes only when + switching to a new master). So save a hash lookup in this case. + */ + if (likely(last_gtid && last_gtid->server_id == gtid->server_id)) + { + last_gtid->seq_no= gtid->seq_no; return 0; } + lookup_gtid= (rpl_gtid *) + my_hash_search(&hash, (const uchar *)>id->server_id, 0); + if (lookup_gtid) + { + lookup_gtid->seq_no= gtid->seq_no; + last_gtid= lookup_gtid; + return 0; + } + + /* Allocate a new GTID and insert it. */ + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (!lookup_gtid) + return 1; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (my_hash_insert(&hash, (const uchar *)lookup_gtid)) + { + my_free(lookup_gtid); + return 1; + } + last_gtid= lookup_gtid; + return 0; +} + + +int +rpl_binlog_state::alloc_element(const rpl_gtid *gtid) +{ + element *elem; + rpl_gtid *lookup_gtid; + /* First time we see this domain_id; allocate a new element. */ elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)); lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); @@ -768,6 +859,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid) offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, HASH_UNIQUE); elem->last_gtid= lookup_gtid; + elem->seq_no_counter= gtid->seq_no; memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) { @@ -787,23 +879,64 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid) } -uint64 -rpl_binlog_state::seq_no_from_state() +/* + Check that a new GTID can be logged without creating an out-of-order + sequence number with existing GTIDs. +*/ +bool +rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no) { - ulong i, j; - uint64 seq_no= 0; + element *elem; - for (i= 0; i < hash.records; ++i) + if ((elem= (element *)my_hash_search(&hash, + (const uchar *)(&domain_id), 0)) && + elem->last_gtid && elem->last_gtid->seq_no >= seq_no) { - element *e= (element *)my_hash_element(&hash, i); - for (j= 0; j < e->hash.records; ++j) - { - const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); - if (gtid->seq_no > seq_no) - seq_no= gtid->seq_no; - } + my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no, + elem->last_gtid->domain_id, elem->last_gtid->server_id, + elem->last_gtid->seq_no); + return 1; } - return seq_no; + return 0; +} + + +/* + When we see a new GTID that will not be binlogged (eg. slave thread + with --log-slave-updates=0), then we need to remember to allocate any + GTID seq_no of our own within that domain starting from there. + + Returns 0 if ok, non-zero if out-of-memory. +*/ +int +rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no) +{ + element *elem; + + if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0))) + { + if (elem->seq_no_counter < seq_no) + elem->seq_no_counter= seq_no; + return 0; + } + + /* We need to allocate a new, empty element to remember the next seq_no. */ + if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)))) + return 1; + + elem->domain_id= domain_id; + my_hash_init(&elem->hash, &my_charset_bin, 32, + offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); + elem->last_gtid= NULL; + elem->seq_no_counter= seq_no; + if (0 == my_hash_insert(&hash, (const uchar *)elem)) + return 0; + + my_hash_free(&elem->hash); + my_free(elem); + return 1; } @@ -824,6 +957,11 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest) { size_t res; element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records == 0); + continue; + } for (j= 0; j <= e->hash.records; ++j) { const rpl_gtid *gtid; @@ -865,7 +1003,7 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src) end= buf + res; if (gtid_parser_helper(&p, end, >id)) return 1; - if (update(>id)) + if (update(>id, false)) return 1; } return 0; @@ -881,6 +1019,17 @@ rpl_binlog_state::find(uint32 domain_id, uint32 server_id) return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0); } +rpl_gtid * +rpl_binlog_state::find_most_recent(uint32 domain_id) +{ + element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); + if (elem && elem->last_gtid) + return elem->last_gtid; + return NULL; +} + uint32 rpl_binlog_state::count() @@ -904,6 +1053,11 @@ rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) for (i= 0; i < hash.records; ++i) { element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records==0); + continue; + } for (j= 0; j <= e->hash.records; ++j) { const rpl_gtid *gtid; @@ -940,20 +1094,44 @@ int rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) { uint32 i; + uint32 alloc_size, out_size; - *size= hash.records; - if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME)))) + alloc_size= hash.records; + if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid), + MYF(MY_WME)))) return 1; - for (i= 0; i < *size; ++i) + out_size= 0; + for (i= 0; i < alloc_size; ++i) { element *e= (element *)my_hash_element(&hash, i); - memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid)); + if (!e->last_gtid) + continue; + memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid)); } + *size= out_size; return 0; } +bool +rpl_binlog_state::append_pos(String *str) +{ + uint32 i; + bool first= true; + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (e->last_gtid && + rpl_slave_state_tostring_helper(str, e->last_gtid, &first)) + return true; + } + + return false; +} + + slave_connection_state::slave_connection_state() { my_hash_init(&hash, &my_charset_bin, 32, @@ -1107,10 +1285,17 @@ slave_connection_state::remove(const rpl_gtid *in_gtid) int slave_connection_state::to_string(String *out_str) { + out_str->length(0); + return append_to_string(out_str); +} + + +int +slave_connection_state::append_to_string(String *out_str) +{ uint32 i; bool first; - out_str->length(0); first= true; for (i= 0; i < hash.records; ++i) { diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index e63d8439803..fefce684c2c 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -91,11 +91,12 @@ struct rpl_slave_state int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no); int truncate_state_table(THD *thd); int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - bool in_transaction); + bool in_transaction, bool in_statement); uint64 next_subid(uint32 domain_id); int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra); bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid); - int load(THD *thd, char *state_from_master, size_t len, bool reset); + int load(THD *thd, char *state_from_master, size_t len, bool reset, + bool in_statement); bool is_empty(); void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); } @@ -130,24 +131,37 @@ struct rpl_binlog_state HASH hash; /* Containing all server_id for one domain_id */ /* The most recent entry in the hash. */ rpl_gtid *last_gtid; + /* Counter to allocate next seq_no for this domain. */ + uint64 seq_no_counter; + + int update_element(const rpl_gtid *gtid); }; /* Mapping from domain_id to collection of elements. */ HASH hash; /* Mutex protecting access to the state. */ mysql_mutex_t LOCK_binlog_state; + my_bool initialized; rpl_binlog_state(); ~rpl_binlog_state(); void reset(); - int update(const struct rpl_gtid *gtid); - uint64 seq_no_from_state(); + void free(); + bool load(struct rpl_gtid *list, uint32 count); + int update(const struct rpl_gtid *gtid, bool strict); + int update_with_next_gtid(uint32 domain_id, uint32 server_id, + rpl_gtid *gtid); + int alloc_element(const rpl_gtid *gtid); + bool check_strict_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no); + int bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no); int write_to_iocache(IO_CACHE *dest); int read_from_iocache(IO_CACHE *src); uint32 count(); int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); + bool append_pos(String *str); rpl_gtid *find(uint32 domain_id, uint32 server_id); + rpl_gtid *find_most_recent(uint32 domain_id); }; @@ -170,6 +184,7 @@ struct slave_connection_state void remove(const rpl_gtid *gtid); ulong count() const { return hash.records; } int to_string(String *out_str); + int append_to_string(String *out_str); }; extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index d7ef562a5ff..a68fd055d74 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -37,7 +37,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF), connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0), slave_running(0), slave_run_id(0), sync_counter(0), - heartbeat_period(0), received_heartbeats(0), master_id(0), using_gtid(0) + heartbeat_period(0), received_heartbeats(0), master_id(0), + using_gtid(USE_GTID_NO) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -153,7 +154,7 @@ void init_master_log_pos(Master_info* mi) mi->master_log_name[0] = 0; mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number - mi->using_gtid= false; + mi->using_gtid= Master_info::USE_GTID_NO; /* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; @@ -481,7 +482,15 @@ file '%s')", fname); while (!init_strvar_from_file(buf, sizeof(buf), &mi->file, 0)) { if (0 == strncmp(buf, STRING_WITH_LEN("using_gtid="))) - mi->using_gtid= (0 != atoi(buf + sizeof("using_gtid"))); + { + int val= atoi(buf + sizeof("using_gtid")); + if (val == Master_info::USE_GTID_CURRENT_POS) + mi->using_gtid= Master_info::USE_GTID_CURRENT_POS; + else if (val == Master_info::USE_GTID_SLAVE_POS) + mi->using_gtid= Master_info::USE_GTID_SLAVE_POS; + else + mi->using_gtid= Master_info::USE_GTID_NO; + } } } } diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 9958f9a5cea..7e3709993ed 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -131,10 +131,14 @@ class Master_info : public Slave_reporting_capability DYNAMIC_ARRAY ignore_server_ids; ulong master_id; /* - True if slave position is set using GTID state rather than old-style - file/offset binlog position. + Which kind of GTID position (if any) is used when connecting to master. + + Note that you can not change the numeric values of these, they are used + in master.info. */ - bool using_gtid; + enum { + USE_GTID_NO= 0, USE_GTID_CURRENT_POS= 1, USE_GTID_SLAVE_POS= 2 + } using_gtid; }; int init_master_info(Master_info* mi, const char* master_info_fname, const char* slave_info_fname, diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 6add94580a3..543c6d9fffe 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1097,7 +1097,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) ulonglong log_pos; DBUG_ENTER("Relay_log_info::is_until_satisfied"); - DBUG_ASSERT(until_condition != UNTIL_NONE); + DBUG_ASSERT(until_condition == UNTIL_MASTER_POS || + until_condition == UNTIL_RELAY_POS); if (until_condition == UNTIL_MASTER_POS) { @@ -1397,7 +1398,6 @@ rpl_load_gtid_slave_state(THD *thd) HASH hash; int err= 0; uint32 i; - uint64 highest_seq_no= 0; DBUG_ENTER("rpl_load_gtid_slave_state"); rpl_global_gtid_slave_state.lock(); @@ -1450,8 +1450,6 @@ rpl_load_gtid_slave_state(THD *thd) DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n", (unsigned)domain_id, (unsigned)server_id, (ulong)seq_no, (ulong)sub_id)); - if (seq_no > highest_seq_no) - highest_seq_no= seq_no; if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0))) { @@ -1495,6 +1493,14 @@ rpl_load_gtid_slave_state(THD *thd) rpl_global_gtid_slave_state.unlock(); goto end; } + if (opt_bin_log && + mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id, + entry->gtid.seq_no)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + rpl_global_gtid_slave_state.unlock(); + goto end; + } } rpl_global_gtid_slave_state.loaded= true; rpl_global_gtid_slave_state.unlock(); @@ -1514,7 +1520,6 @@ end: thd->mdl_context.release_transactional_locks(); } my_hash_free(&hash); - mysql_bin_log.bump_seq_no_counter_if_needed(highest_seq_no); DBUG_RETURN(err); } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 7aff6720aac..6dd757343fd 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -263,7 +263,9 @@ public: thread is running). */ - enum {UNTIL_NONE= 0, UNTIL_MASTER_POS, UNTIL_RELAY_POS} until_condition; + enum { + UNTIL_NONE= 0, UNTIL_MASTER_POS, UNTIL_RELAY_POS, UNTIL_GTID + } until_condition; char until_log_name[FN_REFLEN]; ulonglong until_log_pos; /* extension extracted from log_name and converted to int */ @@ -277,6 +279,8 @@ public: UNTIL_LOG_NAMES_CMP_UNKNOWN= -2, UNTIL_LOG_NAMES_CMP_LESS= -1, UNTIL_LOG_NAMES_CMP_EQUAL= 0, UNTIL_LOG_NAMES_CMP_GREATER= 1 } until_log_names_cmp_result; + /* Condition for UNTIL master_gtid_pos. */ + slave_connection_state until_gtid_pos; char cached_charset[6]; /* @@ -354,6 +358,8 @@ public: bool is_until_satisfied(THD *thd, Log_event *ev); inline ulonglong until_pos() { + DBUG_ASSERT(until_condition == UNTIL_MASTER_POS || + until_condition == UNTIL_RELAY_POS); return ((until_condition == UNTIL_MASTER_POS) ? group_master_log_pos : group_relay_log_pos); } diff --git a/sql/set_var.cc b/sql/set_var.cc index 18d86a13998..363f03579ef 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -311,7 +311,7 @@ longlong sys_var::val_int(bool *is_null, { case_get_string_as_lex_string; case_for_integers(return val); - case_for_double(return val); + case_for_double(return (longlong) val); default: my_error(ER_VAR_CANT_BE_READ, MYF(0), name.str); return 0; diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 95caf1f43e5..ad055273155 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6762,6 +6762,12 @@ ER_GTID_POSITION_NOT_FOUND_IN_BINLOG ER_CANNOT_LOAD_SLAVE_GTID_STATE eng "Failed to load replication slave GTID state from table %s.%s" ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG - eng "Requested GTID_POS %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog" + eng "Specified GTID %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos." ER_MASTER_GTID_POS_MISSING_DOMAIN - eng "Requested GTID_POS contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog" + eng "Specified value for @@gtid_slave_pos contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos." +ER_UNTIL_REQUIRES_USING_GTID + eng "START SLAVE UNTIL master_gtid_pos requires that slave is using GTID" +ER_GTID_STRICT_OUT_OF_ORDER + eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled." +ER_GTID_START_FROM_BINLOG_HOLE + eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled" diff --git a/sql/slave.cc b/sql/slave.cc index 407b4a73a72..c7f4dc08096 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -824,7 +824,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, while one of the threads is running, they are in use and cannot be removed. */ - if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running) + if (mi->using_gtid != Master_info::USE_GTID_NO && + !mi->slave_running && !mi->rli.slave_running) { purge_relay_logs(&mi->rli, NULL, 0, &errmsg); mi->master_log_name[0]= 0; @@ -1833,12 +1834,12 @@ after_set_capability: restart or reconnect, we might end up re-fetching and hence re-applying the same event(s) again. */ - if (mi->using_gtid && !mi->master_log_name[0]) + if (mi->using_gtid != Master_info::USE_GTID_NO && !mi->master_log_name[0]) { int rc; char str_buf[256]; - String connect_state(str_buf, sizeof(str_buf), system_charset_info); - connect_state.length(0); + String query_str(str_buf, sizeof(str_buf), system_charset_info); + query_str.length(0); /* Read the master @@GLOBAL.gtid_domain_id variable. @@ -1861,9 +1862,11 @@ after_set_capability: mysql_free_result(master_res); master_res= NULL; - connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"), - system_charset_info); - if (rpl_append_gtid_state(&connect_state, true)) + query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + if (rpl_append_gtid_state(&query_str, + mi->using_gtid == + Master_info::USE_GTID_CURRENT_POS)) { err_code= ER_OUTOFMEMORY; errmsg= "The slave I/O thread stops because a fatal out-of-memory " @@ -1871,9 +1874,9 @@ after_set_capability: sprintf(err_buff, "%s Error: Out of memory", errmsg); goto err; } - connect_state.append(STRING_WITH_LEN("'"), system_charset_info); + query_str.append(STRING_WITH_LEN("'"), system_charset_info); - rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); if (rc) { err_code= mysql_errno(mysql); @@ -1893,8 +1896,78 @@ after_set_capability: goto err; } } + + query_str.length(0); + if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_strict_mode="), + system_charset_info) || + query_str.append_ulonglong(opt_gtid_strict_mode != false)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_gtid_strict_mode failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID) + { + query_str.length(0); + query_str.append(STRING_WITH_LEN("SET @slave_until_gtid='"), + system_charset_info); + if (mi->rli.until_gtid_pos.append_to_string(&query_str)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_until_gtid."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + query_str.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_until_gtid failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_until_gtid."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } } - if (!mi->using_gtid) + if (mi->using_gtid == Master_info::USE_GTID_NO) { /* If we are not using GTID to connect this time, then instead request @@ -1920,7 +1993,7 @@ after_set_capability: (master_row[0] != NULL)) { rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0], - strlen(master_row[0]), false); + strlen(master_row[0]), false, false); } else if (check_io_slave_killed(mi->io_thd, mi, NULL)) goto slave_killed_err; @@ -2288,8 +2361,8 @@ static bool send_show_master_info_header(THD *thd, bool full, sizeof(mi->ssl_crl))); field_list.push_back(new Item_empty_string("Master_SSL_Crlpath", sizeof(mi->ssl_crlpath))); - field_list.push_back(new Item_return_int("Using_Gtid", sizeof(ulong), - MYSQL_TYPE_LONG)); + field_list.push_back(new Item_empty_string("Using_Gtid", + sizeof("Current_Pos")-1)); if (full) { field_list.push_back(new Item_return_int("Retried_transactions", @@ -2302,7 +2375,8 @@ static bool send_show_master_info_header(THD *thd, bool full, 10, MYSQL_TYPE_LONG)); field_list.push_back(new Item_float("Slave_heartbeat_period", 0.0, 3, 10)); - field_list.push_back(new Item_empty_string("Gtid_Pos", gtid_pos_length)); + field_list.push_back(new Item_empty_string("Gtid_Slave_Pos", + gtid_pos_length)); } if (protocol->send_result_set_metadata(&field_list, @@ -2382,7 +2456,8 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store( mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None": ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master": - "Relay"), &my_charset_bin); + ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay": + "Gtid")), &my_charset_bin); protocol->store(mi->rli.until_log_name, &my_charset_bin); protocol->store((ulonglong) mi->rli.until_log_pos); @@ -2473,7 +2548,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath protocol->store(mi->ssl_capath, &my_charset_bin); - protocol->store((uint32) (mi->using_gtid != 0)); + protocol->store((mi->using_gtid==Master_info::USE_GTID_NO ? "No" : + (mi->using_gtid==Master_info::USE_GTID_SLAVE_POS ? + "Slave_Pos" : "Current_Pos")), + &my_charset_bin); if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3079,7 +3157,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) This tests if the position of the beginning of the current event hits the UNTIL barrier. */ - if (rli->until_condition != Relay_log_info::UNTIL_NONE && + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && rli->is_until_satisfied(thd, ev)) { char buf[22]; @@ -3976,7 +4055,8 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, saved_master_log_pos= rli->group_master_log_pos; saved_skip= rli->slave_skip_counter; } - if (rli->until_condition != Relay_log_info::UNTIL_NONE && + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && rli->is_until_satisfied(thd, NULL)) { char buf[22]; @@ -4815,7 +4895,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } break; + case GTID_LIST_EVENT: + { + const char *errmsg; + Gtid_list_log_event *glev; + Log_event *tmp; + + if (mi->rli.until_condition != Relay_log_info::UNTIL_GTID) + goto default_action; + if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, + opt_slave_sql_verify_checksum))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + glev= static_cast<Gtid_list_log_event *>(tmp); + if (glev->gl_flags & Gtid_list_log_event::FLAG_UNTIL_REACHED) + { + char str_buf[128]; + String str(str_buf, sizeof(str_buf), system_charset_info); + mi->rli.until_gtid_pos.to_string(&str); + sql_print_information("Slave IO thread stops because it reached its" + " UNTIL master_gtid_pos %s", str.c_ptr_safe()); + mi->abort_slave= true; + } + delete glev; + + /* + Do not update position for fake Gtid_list event (which has a zero + end_log_pos). + */ + inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; + } + break; + default: + default_action: inc_pos= event_len; break; } diff --git a/sql/sql_join_cache.cc b/sql/sql_join_cache.cc index 6b0882bda80..9fca8730cb5 100644 --- a/sql/sql_join_cache.cc +++ b/sql/sql_join_cache.cc @@ -3812,8 +3812,8 @@ uint JOIN_TAB_SCAN_MRR::aux_buffer_incr(ulong recno) uint incr= 0; TABLE_REF *ref= &join_tab->ref; TABLE *tab= join_tab->table; - uint rec_per_key= - tab->key_info[ref->key].actual_rec_per_key(ref->key_parts-1); + ha_rows rec_per_key= + (ha_rows) tab->key_info[ref->key].actual_rec_per_key(ref->key_parts-1); set_if_bigger(rec_per_key, 1); if (recno == 1) incr= ref->key_length + tab->file->ref_length; diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 50afc657ee6..9cdc8d7d0b6 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -210,6 +210,8 @@ struct LEX_MASTER_INFO char *ssl_crl, *ssl_crlpath; char *relay_log_name; LEX_STRING connection_name; + /* Value in START SLAVE UNTIL master_gtid_pos=xxx */ + LEX_STRING gtid_pos_str; ulonglong pos; ulong relay_log_pos; ulong server_id; @@ -220,8 +222,10 @@ struct LEX_MASTER_INFO changed variable or if it should be left at old value */ enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE} - ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt, - use_gtid_opt; + ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt; + enum { + LEX_GTID_UNCHANGED, LEX_GTID_NO, LEX_GTID_CURRENT_POS, LEX_GTID_SLAVE_POS + } use_gtid_opt; void init() { @@ -237,7 +241,10 @@ struct LEX_MASTER_INFO pos= relay_log_pos= server_id= port= connect_retry= 0; heartbeat_period= 0; ssl= ssl_verify_server_cert= heartbeat_opt= - repl_ignore_server_ids_opt= use_gtid_opt= LEX_MI_UNCHANGED; + repl_ignore_server_ids_opt= LEX_MI_UNCHANGED; + gtid_pos_str.length= 0; + gtid_pos_str.str= NULL; + use_gtid_opt= LEX_GTID_UNCHANGED; } }; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 100d3c9fe85..9e3da1ce641 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -30,6 +30,14 @@ #include "rpl_handler.h" #include "debug_sync.h" + +enum enum_gtid_until_state { + GTID_UNTIL_NOT_DONE, + GTID_UNTIL_STOP_AFTER_STANDALONE, + GTID_UNTIL_STOP_AFTER_TRANSACTION +}; + + int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; #ifndef DBUG_OFF @@ -38,6 +46,74 @@ static int binlog_dump_count = 0; extern TYPELIB binlog_checksum_typelib; + +static int +fake_event_header(String* packet, Log_event_type event_type, ulong extra_len, + my_bool *do_checksum, ha_checksum *crc, const char** errmsg, + uint8 checksum_alg_arg) +{ + char header[LOG_EVENT_HEADER_LEN]; + ulong event_len; + + *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && + checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; + + /* + 'when' (the timestamp) is set to 0 so that slave could distinguish between + real and fake Rotate events (if necessary) + */ + memset(header, 0, 4); + header[EVENT_TYPE_OFFSET] = (uchar)event_type; + event_len= LOG_EVENT_HEADER_LEN + extra_len + + (*do_checksum ? BINLOG_CHECKSUM_LEN : 0); + int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); + int4store(header + EVENT_LEN_OFFSET, event_len); + int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); + // TODO: check what problems this may cause and fix them + int4store(header + LOG_POS_OFFSET, 0); + if (packet->append(header, sizeof(header))) + { + *errmsg= "Failed due to out-of-memory writing event"; + return -1; + } + if (*do_checksum) + { + *crc= my_checksum(0L, NULL, 0); + *crc= my_checksum(*crc, (uchar*)header, sizeof(header)); + } + return 0; +} + + +static int +fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg) +{ + if (do_checksum) + { + char b[BINLOG_CHECKSUM_LEN]; + int4store(b, crc); + if (packet->append(b, sizeof(b))) + { + *errmsg= "Failed due to out-of-memory writing event checksum"; + return -1; + } + } + return 0; +} + + +static int +fake_event_write(NET *net, String *packet, const char **errmsg) +{ + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + { + *errmsg = "failed on my_net_write()"; + return -1; + } + return 0; +} + + /* fake_rotate_event() builds a fake (=which does not exist physically in any binlog) Rotate event, which contains the name of the binlog we are going to @@ -61,59 +137,71 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, uint8 checksum_alg_arg) { DBUG_ENTER("fake_rotate_event"); - char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100]; - - /* - this Rotate is to be sent with checksum if and only if - slave's get_master_version_and_clock time handshake value - of master's @@global.binlog_checksum was TRUE - */ - - my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && - checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; - - /* - 'when' (the timestamp) is set to 0 so that slave could distinguish between - real and fake Rotate events (if necessary) - */ - memset(header, 0, 4); - header[EVENT_TYPE_OFFSET] = ROTATE_EVENT; - + char buf[ROTATE_HEADER_LEN+100]; + my_bool do_checksum; + int err; char* p = log_file_name+dirname_length(log_file_name); uint ident_len = (uint) strlen(p); - ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + - (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); - int4store(header + EVENT_LEN_OFFSET, event_len); - int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); + ha_checksum crc; - // TODO: check what problems this may cause and fix them - int4store(header + LOG_POS_OFFSET, 0); + if ((err= fake_event_header(packet, ROTATE_EVENT, + ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc, + errmsg, checksum_alg_arg))) + DBUG_RETURN(err); - packet->append(header, sizeof(header)); int8store(buf+R_POS_OFFSET,position); packet->append(buf, ROTATE_HEADER_LEN); packet->append(p, ident_len); if (do_checksum) { - char b[BINLOG_CHECKSUM_LEN]; - ha_checksum crc= my_checksum(0L, NULL, 0); - crc= my_checksum(crc, (uchar*)header, sizeof(header)); crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN); crc= my_checksum(crc, (uchar*)p, ident_len); - int4store(b, crc); - packet->append(b, sizeof(b)); } - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || + (err= fake_event_write(net, packet, errmsg))) + DBUG_RETURN(err); + + DBUG_RETURN(0); +} + + +static int fake_gtid_list_event(NET* net, String* packet, + Gtid_list_log_event *glev, const char** errmsg, + uint8 checksum_alg_arg) +{ + my_bool do_checksum; + int err; + ha_checksum crc; + char buf[128]; + String str(buf, sizeof(buf), system_charset_info); + + str.length(0); + if (glev->to_packet(&str)) { - *errmsg = "failed on my_net_write()"; - DBUG_RETURN(-1); + *errmsg= "Failed due to out-of-memory writing Gtid_list event"; + return -1; } - DBUG_RETURN(0); + if ((err= fake_event_header(packet, GTID_LIST_EVENT, + str.length(), &do_checksum, &crc, + errmsg, checksum_alg_arg))) + return err; + + packet->append(str); + if (do_checksum) + { + crc= my_checksum(crc, (uchar*)str.ptr(), str.length()); + } + + if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || + (err= fake_event_write(net, packet, errmsg))) + return err; + + return 0; } + /* Reset thread transmit packet buffer for event sending @@ -526,6 +614,40 @@ get_slave_connect_state(THD *thd, String *out_str) } +static bool +get_slave_gtid_strict_mode(THD *thd) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_strict_mode") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_int(&null_value) && !null_value; +} + + +/* + Get the value of the @slave_until_gtid user variable into the supplied + String (this is the GTID position specified for START SLAVE UNTIL + master_gtid_pos='xxx'). + + Returns false if error (ie. slave did not set the variable and is not doing + START SLAVE UNTIL mater_gtid_pos='xxx'), true if success. +*/ +static bool +get_slave_until_gtid(THD *thd, String *out_str) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_until_gtid") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_str(&null_value, out_str, 0) && !null_value; +} + + /* Function prepares and sends repliation heartbeat event. @@ -767,16 +889,14 @@ contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev) Give an error if the slave requests something that we do not have in our binlog. - - T */ static int check_slave_start_position(THD *thd, slave_connection_state *st, - const char **errormsg, rpl_gtid *error_gtid) + const char **errormsg, rpl_gtid *error_gtid, + slave_connection_state *until_gtid_state) { uint32 i; - bool found; int err; rpl_gtid **delete_list= NULL; uint32 delete_idx= 0; @@ -791,9 +911,9 @@ check_slave_start_position(THD *thd, slave_connection_state *st, rpl_gtid master_replication_gtid; rpl_gtid start_gtid; - if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, - slave_gtid->server_id, - &master_gtid)) && + if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, + slave_gtid->server_id, + &master_gtid) && master_gtid.seq_no >= slave_gtid->seq_no) continue; @@ -814,6 +934,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st, slave_gtid->seq_no != master_replication_gtid.seq_no) { rpl_gtid domain_gtid; + rpl_gtid *until_gtid; if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, &domain_gtid)) @@ -832,6 +953,27 @@ check_slave_start_position(THD *thd, slave_connection_state *st, ++missing_domains; continue; } + + if (until_gtid_state && + ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) || + (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id, + until_gtid->server_id, + &master_gtid) && + master_gtid.seq_no >= until_gtid->seq_no))) + { + /* + The slave requested to start from a position that is not (yet) in + our binlog, but it also specified an UNTIL condition that _is_ in + our binlog (or a missing UNTIL, which means stop at the very + beginning). So the stop position is before the start position, and + we just delete the entry from the UNTIL hash to mark that this + domain has already reached the UNTIL condition. + */ + if(until_gtid) + until_gtid_state->remove(until_gtid); + continue; + } + *errormsg= "Requested slave GTID state not found in binlog"; *error_gtid= *slave_gtid; err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG; @@ -951,7 +1093,8 @@ end: the requested GTID that was already purged. */ static const char * -gtid_find_binlog_file(slave_connection_state *state, char *out_name) +gtid_find_binlog_file(slave_connection_state *state, char *out_name, + slave_connection_state *until_gtid_state) { MEM_ROOT memroot; binlog_file_entry *list; @@ -1003,42 +1146,60 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name) if (!glev || contains_all_slave_gtid(state, glev)) { - uint32 i; - strmake(out_name, buf, FN_REFLEN); - /* - As a special case, we allow to start from binlog file N if the - requested GTID is the last event (in the corresponding domain) in - binlog file (N-1), but then we need to remove that GTID from the slave - state, rather than skipping events waiting for it to turn up. - */ - for (i= 0; i < glev->count; ++i) + if (glev) { - const rpl_gtid *gtid= state->find(glev->list[i].domain_id); - if (!gtid) - { - /* - contains_all_slave_gtid() returns false if there is any domain in - Gtid_list_event which is not in the requested slave position. + uint32 i; - We may delete a domain from the slave state inside this loop, but - we only do this when it is the very last GTID logged for that - domain in earlier binlogs, and then we can not encounter it in any - further GTIDs in the Gtid_list. - */ - DBUG_ASSERT(0); - continue; - } - if (gtid->server_id == glev->list[i].server_id && - gtid->seq_no == glev->list[i].seq_no) + /* + As a special case, we allow to start from binlog file N if the + requested GTID is the last event (in the corresponding domain) in + binlog file (N-1), but then we need to remove that GTID from the slave + state, rather than skipping events waiting for it to turn up. + + If slave is doing START SLAVE UNTIL, check for any UNTIL conditions + that are already included in a previous binlog file. Delete any such + from the UNTIL hash, to mark that such domains have already reached + their UNTIL condition. + */ + for (i= 0; i < glev->count; ++i) { - /* - The slave requested to start from the very beginning of this - domain in this binlog file. So delete the entry from the state, - we do not need to skip anything. - */ - state->remove(gtid); + const rpl_gtid *gtid= state->find(glev->list[i].domain_id); + if (!gtid) + { + /* + Contains_all_slave_gtid() returns false if there is any domain in + Gtid_list_event which is not in the requested slave position. + + We may delete a domain from the slave state inside this loop, but + we only do this when it is the very last GTID logged for that + domain in earlier binlogs, and then we can not encounter it in any + further GTIDs in the Gtid_list. + */ + DBUG_ASSERT(0); + } else if (gtid->server_id == glev->list[i].server_id && + gtid->seq_no == glev->list[i].seq_no) + { + /* + The slave requested to start from the very beginning of this + domain in this binlog file. So delete the entry from the state, + we do not need to skip anything. + */ + state->remove(gtid); + } + + if (until_gtid_state && + (gtid= until_gtid_state->find(glev->list[i].domain_id)) && + gtid->server_id == glev->list[i].server_id && + gtid->seq_no <= glev->list[i].seq_no) + { + /* + We've already reached the stop position in UNTIL for this domain, + since it is before the start position. + */ + until_gtid_state->remove(gtid); + } } } @@ -1163,6 +1324,7 @@ gtid_state_from_pos(const char *name, uint32 offset, goto end; } status= Gtid_list_log_event::peek(packet.ptr(), packet.length(), + current_checksum_alg, >id_list, &list_len); if (status) { @@ -1256,6 +1418,49 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str) } +static bool +is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset, + enum_gtid_until_state gtid_until_group, + Log_event_type event_type, uint8 current_checksum_alg, + ushort flags, const char **errmsg, + rpl_binlog_state *until_binlog_state) +{ + switch (gtid_until_group) + { + case GTID_UNTIL_NOT_DONE: + return false; + case GTID_UNTIL_STOP_AFTER_STANDALONE: + if (Log_event::is_part_of_group(event_type)) + return false; + break; + case GTID_UNTIL_STOP_AFTER_TRANSACTION: + if (event_type != XID_EVENT && + (event_type != QUERY_EVENT || + !Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset, + packet->length()-*ev_offset, + current_checksum_alg))) + return false; + break; + } + + /* + The last event group has been sent, now the START SLAVE UNTIL condition + has been reached. + + Send a last fake Gtid_list_log_event with a flag set to mark that we + stop due to UNTIL condition. + */ + if (reset_transmit_packet(thd, flags, ev_offset, errmsg)) + return true; + Gtid_list_log_event glev(until_binlog_state, + Gtid_list_log_event::FLAG_UNTIL_REACHED); + if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg)) + return true; + *errmsg= NULL; + return true; +} + + /* Helper function for mysql_binlog_send() to write an event down the slave connection. @@ -1268,37 +1473,142 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, IO_CACHE *log, int mariadb_slave_capability, ulong ev_offset, uint8 current_checksum_alg, bool using_gtid_state, slave_connection_state *gtid_state, - enum_gtid_skip_type *gtid_skip_group) + enum_gtid_skip_type *gtid_skip_group, + slave_connection_state *until_gtid_state, + enum_gtid_until_state *gtid_until_group, + rpl_binlog_state *until_binlog_state, + bool slave_gtid_strict_mode, rpl_gtid *error_gtid) { my_off_t pos; size_t len= packet->length(); + if (event_type == GTID_LIST_EVENT && using_gtid_state && until_gtid_state) + { + rpl_gtid *gtid_list; + uint32 list_len; + bool err; + + if (ev_offset > len || + Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + current_checksum_alg, + >id_list, &list_len)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed to read Gtid_list_log_event: corrupt binlog"; + } + err= until_binlog_state->load(gtid_list, list_len); + my_free(gtid_list); + if (err) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed in internal GTID book-keeping: Out of memory"; + } + } + /* Skip GTID event groups until we reach slave position within a domain_id. */ - if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0) + if (event_type == GTID_EVENT && using_gtid_state) { - uint32 server_id, domain_id; - uint64 seq_no; uchar flags2; rpl_gtid *gtid; - if (ev_offset > len || - Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, - current_checksum_alg, - &domain_id, &server_id, &seq_no, &flags2)) - return "Failed to read Gtid_log_event: corrupt binlog"; - gtid= gtid_state->find(domain_id); - if (gtid != NULL) + if (gtid_state->count() > 0 || until_gtid_state) { - /* Skip this event group if we have not yet reached slave start pos. */ - if (server_id != gtid->server_id || seq_no <= gtid->seq_no) - *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? - GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); - /* - Delete this entry if we have reached slave start position (so we will - not skip subsequent events and won't have to look them up and check). - */ - if (server_id == gtid->server_id && seq_no >= gtid->seq_no) - gtid_state->remove(gtid); + rpl_gtid event_gtid; + + if (ev_offset > len || + Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + current_checksum_alg, + &event_gtid.domain_id, &event_gtid.server_id, + &event_gtid.seq_no, &flags2)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed to read Gtid_log_event: corrupt binlog"; + } + + if (until_binlog_state->update(&event_gtid, false)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed in internal GTID book-keeping: Out of memory"; + } + + if (gtid_state->count() > 0) + { + gtid= gtid_state->find(event_gtid.domain_id); + if (gtid != NULL) + { + /* Skip this event group if we have not yet reached slave start pos. */ + if (event_gtid.server_id != gtid->server_id || + event_gtid.seq_no <= gtid->seq_no) + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + if (event_gtid.server_id == gtid->server_id && + event_gtid.seq_no >= gtid->seq_no) + { + /* + In strict mode, it is an error if the slave requests to start in + a "hole" in the master's binlog: a GTID that does not exist, even + though both the prior and subsequent seq_no exists for same + domain_id and server_id. + */ + if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no) + { + my_errno= ER_GTID_START_FROM_BINLOG_HOLE; + *error_gtid= *gtid; + return "The binlog on the master is missing the GTID requested " + "by the slave (even though both a prior and a subsequent " + "sequence number does exist), and GTID strict mode is enabled."; + } + /* + Delete this entry if we have reached slave start position (so we + will not skip subsequent events and won't have to look them up + and check). + */ + gtid_state->remove(gtid); + } + } + } + + if (until_gtid_state) + { + gtid= until_gtid_state->find(event_gtid.domain_id); + if (gtid == NULL) + { + /* + This domain already reached the START SLAVE UNTIL stop condition, + so skip this event group. + */ + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + } + else if (event_gtid.server_id == gtid->server_id && + event_gtid.seq_no >= gtid->seq_no) + { + /* + We have reached the stop condition. + Delete this domain_id from the hash, so we will skip all further + events in this domain and eventually stop when all domains are + done. + */ + uint64 until_seq_no= gtid->seq_no; + until_gtid_state->remove(gtid); + if (until_gtid_state->count() == 0) + *gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_UNTIL_STOP_AFTER_STANDALONE : + GTID_UNTIL_STOP_AFTER_TRANSACTION); + if (event_gtid.seq_no > until_seq_no) + { + /* + The GTID in START SLAVE UNTIL condition is missing in our binlog. + This should normally not happen (user error), but since we can be + sure that we are now beyond the position that the UNTIL condition + should be in, we can just stop now. And we also need to skip this + event group (as it is beyond the UNTIL condition). + */ + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + } + } + } } } @@ -1356,7 +1666,10 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, a no-operation on the slave. */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace row annotate event with dummy: too small event."; + } } } @@ -1375,8 +1688,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, ev_offset, current_checksum_alg); if (err) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace GTID event with backwards-compatible event: " "currupt event."; + } if (!need_dummy) return NULL; } @@ -1403,8 +1719,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, binlog positions. */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace binlog checkpoint or gtid list event with " "dummy: too small event."; + } } } @@ -1428,24 +1747,37 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, pos= my_b_tell(log); if (RUN_HOOK(binlog_transmit, before_send_event, (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; return "run 'before_send_event' hook failed"; + } if (my_net_write(net, (uchar*) packet->ptr(), len)) + { + my_errno= ER_UNKNOWN_ERROR; return "Failed on my_net_write()"; + } DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); if (event_type == LOAD_EVENT) { if (send_file(thd)) + { + my_errno= ER_UNKNOWN_ERROR; return "failed in send_file()"; + } } if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + my_errno= ER_UNKNOWN_ERROR; return "Failed to run hook 'after_send_event'"; + } return NULL; /* Success */ } + void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags) { @@ -1465,12 +1797,18 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, mysql_mutex_t *log_lock; mysql_cond_t *log_cond; int mariadb_slave_capability; - char str_buf[256]; + char str_buf[128]; String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); bool using_gtid_state; - slave_connection_state gtid_state, return_gtid_state; + char str_buf2[128]; + String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info); + slave_connection_state gtid_state, until_gtid_state_obj; + slave_connection_state *until_gtid_state= NULL; rpl_gtid error_gtid; enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT; + enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE; + rpl_binlog_state until_binlog_state; + bool slave_gtid_strict_mode; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; @@ -1502,6 +1840,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, connect_gtid_state.length(0); using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;); + if (using_gtid_state) + { + slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); + if(get_slave_until_gtid(thd, &slave_until_gtid_str)) + until_gtid_state= &until_gtid_state_obj; + } + /* We want to corrupt the first event, in Log_event::read_log_event(). But we do not want the corruption to happen early, eg. when client does @@ -1557,13 +1902,23 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, my_errno= ER_UNKNOWN_ERROR; goto err; } + if (until_gtid_state && + until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), + slave_until_gtid_str.length())) + { + errmsg= "Out of memory or malformed slave request when obtaining UNTIL " + "position sent from slave"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } if ((error= check_slave_start_position(thd, >id_state, &errmsg, - &error_gtid))) + &error_gtid, until_gtid_state))) { my_errno= error; goto err; } - if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name))) + if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name, + until_gtid_state))) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; @@ -1753,6 +2108,15 @@ impossible position"; /* The Format_description_log_event event will be found naturally. */ } + /* + Handle the case of START SLAVE UNTIL with an UNTIL condition already + fulfilled at the start position. + + We will send one event, the format_description, and then stop. + */ + if (until_gtid_state && until_gtid_state->count() == 0) + gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; + /* seek to the requested position, to start the requested dump */ my_b_seek(&log, pos); // Seek will done on next read @@ -1833,12 +2197,26 @@ impossible position"; log_file_name, &log, mariadb_slave_capability, ev_offset, current_checksum_alg, using_gtid_state, - >id_state, >id_skip_group))) + >id_state, >id_skip_group, + until_gtid_state, >id_until_group, + &until_binlog_state, + slave_gtid_strict_mode, &error_gtid))) { errmsg= tmp_msg; - my_errno= ER_UNKNOWN_ERROR; goto err; } + if (until_gtid_state && + is_until_reached(thd, net, packet, &ev_offset, gtid_until_group, + event_type, current_checksum_alg, flags, &errmsg, + &until_binlog_state)) + { + if (errmsg) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + goto end; + } DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", { @@ -1950,6 +2328,8 @@ impossible position"; thd->ENTER_COND(log_cond, log_lock, &stage_master_has_sent_all_binlog_to_slave, &old_stage); + if (thd->killed) + break; ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts); DBUG_ASSERT(ret == 0 || (heartbeat_period != 0)); if (ret == ETIMEDOUT || ret == ETIME) @@ -1981,7 +2361,7 @@ impossible position"; { DBUG_PRINT("wait",("binary log received update or a broadcast signal caught")); } - } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed); + } while (signal_cnt == mysql_bin_log.signal_cnt); thd->EXIT_COND(&old_stage); } break; @@ -1992,18 +2372,34 @@ impossible position"; goto err; } - if (read_packet && - (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, - log_file_name, &log, - mariadb_slave_capability, ev_offset, - current_checksum_alg, - using_gtid_state, >id_state, - >id_skip_group))) + if (read_packet) { - errmsg= tmp_msg; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, + log_file_name, &log, + mariadb_slave_capability, ev_offset, + current_checksum_alg, + using_gtid_state, >id_state, + >id_skip_group, until_gtid_state, + >id_until_group, &until_binlog_state, + slave_gtid_strict_mode, &error_gtid))) + { + errmsg= tmp_msg; + goto err; + } + if ( + until_gtid_state && + is_until_reached(thd, net, packet, &ev_offset, gtid_until_group, + event_type, current_checksum_alg, flags, &errmsg, + &until_binlog_state)) + { + if (errmsg) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + goto end; + } + } log.error=0; } @@ -2099,6 +2495,17 @@ err: /* Use this error code so slave will know not to try reconnect. */ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; } + else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE) + { + my_snprintf(error_text, sizeof(error_text), + "The binlog on the master is missing the GTID %u-%u-%llu " + "requested by the slave (even though both a prior and a " + "subsequent sequence number does exist), and GTID strict mode " + "is enabled", + error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + /* Use this error code so slave will know not to try reconnect. */ + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE) { my_snprintf(error_text, sizeof(error_text), @@ -2167,6 +2574,26 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) lock_slave_threads(mi); // this allows us to cleanly read slave_running // Get a mask of _stopped_ threads init_thread_mask(&thread_mask,mi,1 /* inverse */); + + if (thd->lex->mi.gtid_pos_str.str) + { + if (thread_mask != (SLAVE_IO|SLAVE_SQL)) + { + slave_errno= ER_SLAVE_WAS_RUNNING; + goto err; + } + if (thd->lex->slave_thd_opt) + { + slave_errno= ER_BAD_SLAVE_UNTIL_COND; + goto err; + } + if (mi->using_gtid == Master_info::USE_GTID_NO) + { + slave_errno= ER_UNTIL_REQUIRES_USING_GTID; + goto err; + } + } + /* Below we will start all stopped threads. But if the user wants to start only one thread, do as if the other thread was running (as we @@ -2213,10 +2640,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name, sizeof(mi->rli.until_log_name)-1); } + else if (thd->lex->mi.gtid_pos_str.str) + { + if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str, + thd->lex->mi.gtid_pos_str.length)) + { + slave_errno= ER_INCORRECT_GTID_STATE; + mysql_mutex_unlock(&mi->rli.data_lock); + goto err; + } + mi->rli.until_condition= Relay_log_info::UNTIL_GTID; + } else mi->rli.clear_until_condition(); - if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE) + if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS || + mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS) { /* Preparing members for effective until condition checking */ const char *p= fn_ext(mi->rli.until_log_name); @@ -2239,7 +2678,10 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) /* mark the cached result of the UNTIL comparison as "undefined" */ mi->rli.until_log_names_cmp_result= Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN; + } + if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE) + { /* Issuing warning then started without --skip-slave-start */ if (!opt_skip_slave_start) push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, @@ -2271,6 +2713,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) ER(ER_SLAVE_WAS_RUNNING)); } +err: unlock_slave_threads(mi); if (slave_errno) @@ -2749,12 +3192,14 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos; } - if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_ENABLE) - mi->using_gtid= true; - else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_DISABLE || + if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_SLAVE_POS) + mi->using_gtid= Master_info::USE_GTID_SLAVE_POS; + else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_CURRENT_POS) + mi->using_gtid= Master_info::USE_GTID_CURRENT_POS; + else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_NO || lex_mi->log_file_name || lex_mi->pos || lex_mi->relay_log_name || lex_mi->relay_log_pos) - mi->using_gtid= false; + mi->using_gtid= Master_info::USE_GTID_NO; /* If user did specify neither host nor port nor any log name nor any log @@ -2810,7 +3255,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) goto err; } - if (mi->using_gtid) + if (mi->using_gtid != Master_info::USE_GTID_NO) { /* Clear the position in the master binlogs, so that we request the @@ -3293,7 +3738,7 @@ int log_loaded_block(IO_CACHE* file) /** - Initialise the slave replication state from the mysql.rpl_slave_state table. + Initialise the slave replication state from the mysql.gtid_slave_pos table. This is called each time an SQL thread starts, but the data is only actually loaded on the first call. @@ -3305,7 +3750,7 @@ int log_loaded_block(IO_CACHE* file) The one containing the current slave state is the one with the maximal sub_id value, within each domain_id. - CREATE TABLE mysql.rpl_slave_state ( + CREATE TABLE mysql.gtid_slave_pos ( domain_id INT UNSIGNED NOT NULL, sub_id BIGINT UNSIGNED NOT NULL, server_id INT UNSIGNED NOT NULL, @@ -3341,7 +3786,7 @@ rpl_append_gtid_state(String *dest, bool use_binlog) rpl_gtid *gtid_list= NULL; uint32 num_gtids= 0; - if (opt_bin_log && + if (use_binlog && opt_bin_log && (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) return err; @@ -3353,9 +3798,10 @@ rpl_append_gtid_state(String *dest, bool use_binlog) bool -rpl_gtid_pos_check(char *str, size_t len) +rpl_gtid_pos_check(THD *thd, char *str, size_t len) { slave_connection_state tmp_slave_state; + bool gave_conflict_warning= false, gave_missing_warning= false; /* Check that we can parse the supplied string. */ if (tmp_slave_state.load(str, len)) @@ -3390,18 +3836,43 @@ rpl_gtid_pos_check(char *str, size_t len) continue; if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id))) { - my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0), - binlog_gtid->domain_id, binlog_gtid->domain_id, - binlog_gtid->server_id, binlog_gtid->seq_no); - break; + if (opt_gtid_strict_mode) + { + my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0), + binlog_gtid->domain_id, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + break; + } + else if (!gave_missing_warning) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_MASTER_GTID_POS_MISSING_DOMAIN, + ER(ER_MASTER_GTID_POS_MISSING_DOMAIN), + binlog_gtid->domain_id, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + gave_missing_warning= true; + } } - if (slave_gtid->seq_no < binlog_gtid->seq_no) + else if (slave_gtid->seq_no < binlog_gtid->seq_no) { - my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0), - slave_gtid->domain_id, slave_gtid->server_id, - slave_gtid->seq_no, binlog_gtid->domain_id, - binlog_gtid->server_id, binlog_gtid->seq_no); - break; + if (opt_gtid_strict_mode) + { + my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0), + slave_gtid->domain_id, slave_gtid->server_id, + slave_gtid->seq_no, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + break; + } + else if (!gave_conflict_warning) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, + ER(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG), + slave_gtid->domain_id, slave_gtid->server_id, + slave_gtid->seq_no, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + gave_conflict_warning= true; + } } } my_free(binlog_gtid_list); @@ -3416,7 +3887,7 @@ rpl_gtid_pos_check(char *str, size_t len) bool rpl_gtid_pos_update(THD *thd, char *str, size_t len) { - if (rpl_global_gtid_slave_state.load(thd, str, len, true)) + if (rpl_global_gtid_slave_state.load(thd, str, len, true, true)) { my_error(ER_FAILED_GTID_STATE_INIT, MYF(0)); return true; diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 3af8f721bd7..820ffed0928 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -70,7 +70,7 @@ void rpl_init_gtid_slave_state(); void rpl_deinit_gtid_slave_state(); int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str); int rpl_append_gtid_state(String *dest, bool use_binlog); -bool rpl_gtid_pos_check(char *str, size_t len); +bool rpl_gtid_pos_check(THD *thd, char *str, size_t len); bool rpl_gtid_pos_update(THD *thd, char *str, size_t len); #endif /* HAVE_REPLICATION */ diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 48fb926bfcc..ba36c0fc63f 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -5977,8 +5977,8 @@ static int get_schema_stat_record(THD *thd, TABLE_LIST *tables, KEY *key=show_table->key_info+i; if (key->rec_per_key[j]) { - ha_rows records=((double) show_table->stat_records() / - key->actual_rec_per_key(j)); + ha_rows records= (ha_rows) ((double) show_table->stat_records() / + key->actual_rec_per_key(j)); table->field[9]->store((longlong) records, TRUE); table->field[9]->set_notnull(); } diff --git a/sql/sql_statistics.cc b/sql/sql_statistics.cc index f355f2c7760..2e2886a1d3f 100644 --- a/sql/sql_statistics.cc +++ b/sql/sql_statistics.cc @@ -337,7 +337,7 @@ protected: void store_record_for_lookup() { - store_record(stat_table, record[0]); + DBUG_ASSERT(record[0] == stat_table->record[0]); } bool update_record() diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 447ccf5f469..6be7c23fb0d 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -1026,6 +1026,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token CUBE_SYM /* SQL-2003-R */ %token CURDATE /* MYSQL-FUNC */ %token CURRENT_USER /* SQL-2003-R */ +%token CURRENT_POS_SYM %token CURSOR_SYM /* SQL-2003-R */ %token CURSOR_NAME_SYM /* SQL-2003-N */ %token CURTIME /* MYSQL-FUNC */ @@ -1205,7 +1206,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token LOW_PRIORITY %token LT /* OPERATOR */ %token MASTER_CONNECT_RETRY_SYM -%token MASTER_USE_GTID_SYM +%token MASTER_GTID_POS_SYM %token MASTER_HOST_SYM %token MASTER_LOG_FILE_SYM %token MASTER_LOG_POS_SYM @@ -1223,6 +1224,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token MASTER_SSL_VERIFY_SERVER_CERT_SYM %token MASTER_SYM %token MASTER_USER_SYM +%token MASTER_USE_GTID_SYM %token MASTER_HEARTBEAT_PERIOD_SYM %token MATCH /* SQL-2003-R */ %token MAX_CONNECTIONS_PER_HOUR @@ -1406,6 +1408,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token SIMPLE_SYM /* SQL-2003-N */ %token SLAVE %token SLAVES +%token SLAVE_POS_SYM %token SLOW %token SMALLINT /* SQL-2003-R */ %token SNAPSHOT_SYM @@ -2191,15 +2194,34 @@ master_file_def: /* Adjust if < BIN_LOG_HEADER_SIZE (same comment as Lex->mi.pos) */ Lex->mi.relay_log_pos = max(BIN_LOG_HEADER_SIZE, Lex->mi.relay_log_pos); } - | MASTER_USE_GTID_SYM EQ ulong_num + | MASTER_USE_GTID_SYM EQ CURRENT_POS_SYM { - if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_GTID_UNCHANGED) { my_error(ER_DUP_ARGUMENT, MYF(0), "MASTER_use_gtid"); MYSQL_YYABORT; } - Lex->mi.use_gtid_opt= $3 ? - LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE; + Lex->mi.use_gtid_opt= LEX_MASTER_INFO::LEX_GTID_CURRENT_POS; + } + ; + | MASTER_USE_GTID_SYM EQ SLAVE_POS_SYM + { + if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_GTID_UNCHANGED) + { + my_error(ER_DUP_ARGUMENT, MYF(0), "MASTER_use_gtid"); + MYSQL_YYABORT; + } + Lex->mi.use_gtid_opt= LEX_MASTER_INFO::LEX_GTID_SLAVE_POS; + } + ; + | MASTER_USE_GTID_SYM EQ NO_SYM + { + if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_GTID_UNCHANGED) + { + my_error(ER_DUP_ARGUMENT, MYF(0), "MASTER_use_gtid"); + MYSQL_YYABORT; + } + Lex->mi.use_gtid_opt= LEX_MASTER_INFO::LEX_GTID_NO; } ; @@ -7378,6 +7400,10 @@ slave_until: MYSQL_YYABORT; } } + | UNTIL_SYM MASTER_GTID_POS_SYM EQ TEXT_STRING_sys + { + Lex->mi.gtid_pos_str = $4; + } ; slave_until_opts: @@ -13412,6 +13438,7 @@ keyword_sp: | CONSTRAINT_NAME_SYM {} | CONTEXT_SYM {} | CONTRIBUTORS_SYM {} + | CURRENT_POS_SYM {} | CPU_SYM {} | CUBE_SYM {} | CURSOR_NAME_SYM {} @@ -13489,12 +13516,13 @@ keyword_sp: | MAX_ROWS {} | MASTER_SYM {} | MASTER_HEARTBEAT_PERIOD_SYM {} - | MASTER_USE_GTID_SYM {} + | MASTER_GTID_POS_SYM {} | MASTER_HOST_SYM {} | MASTER_PORT_SYM {} | MASTER_LOG_FILE_SYM {} | MASTER_LOG_POS_SYM {} | MASTER_USER_SYM {} + | MASTER_USE_GTID_SYM {} | MASTER_PASSWORD_SYM {} | MASTER_SERVER_ID_SYM {} | MASTER_CONNECT_RETRY_SYM {} @@ -13599,6 +13627,7 @@ keyword_sp: | SIMPLE_SYM {} | SHARE_SYM {} | SHUTDOWN {} + | SLAVE_POS_SYM {} | SLOW {} | SNAPSHOT_SYM {} | SOFT_SYM {} diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 8c5a6c92a6b..3c78901addf 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1345,6 +1345,26 @@ static Sys_var_uint Sys_gtid_domain_id( BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_has_super)); + +static bool check_gtid_seq_no(sys_var *self, THD *thd, set_var *var) +{ + uint32 domain_id, server_id; + uint64_t seq_no; + + if (check_has_super(self, thd, var)) + return true; + domain_id= thd->variables.gtid_domain_id; + server_id= thd->variables.server_id; + seq_no= (uint64)var->value->val_uint(); + DBUG_EXECUTE_IF("ignore_set_gtid_seq_no_check", return 0;); + if (opt_gtid_strict_mode && opt_bin_log && + mysql_bin_log.check_strict_gtid_sequence(domain_id, server_id, seq_no)) + return true; + + return false; +} + + static Sys_var_ulonglong Sys_gtid_seq_no( "gtid_seq_no", "Internal server usage, for replication with global transaction id. " @@ -1354,12 +1374,65 @@ static Sys_var_ulonglong Sys_gtid_seq_no( SESSION_ONLY(gtid_seq_no), NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, - ON_CHECK(check_has_super)); + ON_CHECK(check_gtid_seq_no)); #ifdef HAVE_REPLICATION +static unsigned char opt_gtid_binlog_pos_dummy; +static Sys_var_gtid_binlog_pos Sys_gtid_binlog_pos( + "gtid_binlog_pos", "Last GTID logged to the binary log, per replication" + "domain", + READ_ONLY GLOBAL_VAR(opt_gtid_binlog_pos_dummy), NO_CMD_LINE); + + +uchar * +Sys_var_gtid_binlog_pos::global_value_ptr(THD *thd, LEX_STRING *base) +{ + char buf[128]; + String str(buf, sizeof(buf), system_charset_info); + char *p; + + str.length(0); + if ((opt_bin_log && mysql_bin_log.append_state_pos(&str)) || + !(p= thd->strmake(str.ptr(), str.length()))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return NULL; + } + + return (uchar *)p; +} + + +static unsigned char opt_gtid_current_pos_dummy; +static Sys_var_gtid_current_pos Sys_gtid_current_pos( + "gtid_current_pos", "Current GTID position of the server. Per " + "replication domain, this is either the last GTID replicated by a " + "slave thread, or the GTID logged to the binary log, whichever is " + "most recent.", + READ_ONLY GLOBAL_VAR(opt_gtid_current_pos_dummy), NO_CMD_LINE); + + +uchar * +Sys_var_gtid_current_pos::global_value_ptr(THD *thd, LEX_STRING *base) +{ + String str; + char *p; + + str.length(0); + if (rpl_append_gtid_state(&str, true) || + !(p= thd->strmake(str.ptr(), str.length()))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return NULL; + } + + return (uchar *)p; +} + + bool -Sys_var_gtid_pos::do_check(THD *thd, set_var *var) +Sys_var_gtid_slave_pos::do_check(THD *thd, set_var *var) { String str, *res; bool running; @@ -1372,7 +1445,12 @@ Sys_var_gtid_pos::do_check(THD *thd, set_var *var) return true; if (!(res= var->value->val_str(&str))) return true; - if (rpl_gtid_pos_check(&((*res)[0]), res->length())) + if (thd->in_active_multi_stmt_transaction()) + { + my_error(ER_CANT_DO_THIS_DURING_AN_TRANSACTION, MYF(0)); + return true; + } + if (rpl_gtid_pos_check(thd, &((*res)[0]), res->length())) return true; if (!(var->save_result.string_value.str= @@ -1387,7 +1465,7 @@ Sys_var_gtid_pos::do_check(THD *thd, set_var *var) bool -Sys_var_gtid_pos::global_update(THD *thd, set_var *var) +Sys_var_gtid_slave_pos::global_update(THD *thd, set_var *var) { bool err; @@ -1413,13 +1491,13 @@ Sys_var_gtid_pos::global_update(THD *thd, set_var *var) uchar * -Sys_var_gtid_pos::global_value_ptr(THD *thd, LEX_STRING *base) +Sys_var_gtid_slave_pos::global_value_ptr(THD *thd, LEX_STRING *base) { String str; char *p; str.length(0); - if (rpl_append_gtid_state(&str, true) || + if (rpl_append_gtid_state(&str, false) || !(p= thd->strmake(str.ptr(), str.length()))) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); @@ -1430,14 +1508,21 @@ Sys_var_gtid_pos::global_value_ptr(THD *thd, LEX_STRING *base) } -static unsigned char opt_gtid_pos_dummy; -static Sys_var_gtid_pos Sys_gtid_pos( - "gtid_pos", +static unsigned char opt_gtid_slave_pos_dummy; +static Sys_var_gtid_slave_pos Sys_gtid_slave_pos( + "gtid_slave_pos", "The list of global transaction IDs that were last replicated on the " - "server, one for each replication domain. This defines where a slave " - "starts replicating from on a master when connecting with global " - "transaction ID.", - GLOBAL_VAR(opt_gtid_pos_dummy), NO_CMD_LINE); + "server, one for each replication domain.", + GLOBAL_VAR(opt_gtid_slave_pos_dummy), NO_CMD_LINE); + + +static Sys_var_mybool Sys_gtid_strict_mode( + "gtid_strict_mode", + "Enforce strict seq_no ordering of events in the binary log. Slave " + "stops with an error if it encounters an event that would cause it to " + "generate an out-of-order binlog if executed.", + GLOBAL_VAR(opt_gtid_strict_mode), + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); #endif diff --git a/sql/sys_vars.h b/sql/sys_vars.h index 4a38b41ab9b..bf17040e65c 100644 --- a/sql/sys_vars.h +++ b/sql/sys_vars.h @@ -2049,12 +2049,114 @@ public: /** - Class for @@global.gtid_pos. + Class for @@global.gtid_current_pos. */ -class Sys_var_gtid_pos: public sys_var +class Sys_var_gtid_current_pos: public sys_var { public: - Sys_var_gtid_pos(const char *name_arg, + Sys_var_gtid_current_pos(const char *name_arg, + const char *comment, int flag_args, ptrdiff_t off, size_t size, + CMD_LINE getopt) + : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id, + getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG, + NULL, NULL, NULL) + { + option.var_type= GET_STR; + } + bool do_check(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + return true; + } + bool session_update(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + return true; + } + bool global_update(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + return true; + } + bool check_update_type(Item_result type) { + DBUG_ASSERT(false); + return false; + } + void session_save_default(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + } + void global_save_default(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + } + uchar *session_value_ptr(THD *thd, LEX_STRING *base) + { + DBUG_ASSERT(false); + return NULL; + } + uchar *global_value_ptr(THD *thd, LEX_STRING *base); +}; + + +/** + Class for @@global.gtid_binlog_pos. +*/ +class Sys_var_gtid_binlog_pos: public sys_var +{ +public: + Sys_var_gtid_binlog_pos(const char *name_arg, + const char *comment, int flag_args, ptrdiff_t off, size_t size, + CMD_LINE getopt) + : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id, + getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG, + NULL, NULL, NULL) + { + option.var_type= GET_STR; + } + bool do_check(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + return true; + } + bool session_update(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + return true; + } + bool global_update(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + return true; + } + bool check_update_type(Item_result type) { + DBUG_ASSERT(false); + return false; + } + void session_save_default(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + } + void global_save_default(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + } + uchar *session_value_ptr(THD *thd, LEX_STRING *base) + { + DBUG_ASSERT(false); + return NULL; + } + uchar *global_value_ptr(THD *thd, LEX_STRING *base); +}; + + +/** + Class for @@global.gtid_slave_pos. +*/ +class Sys_var_gtid_slave_pos: public sys_var +{ +public: + Sys_var_gtid_slave_pos(const char *name_arg, const char *comment, int flag_args, ptrdiff_t off, size_t size, CMD_LINE getopt) : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id, |