summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-05-28 15:39:56 +0200
committerunknown <knielsen@knielsen-hq.org>2013-05-28 15:39:56 +0200
commita0fd7382bc1e4a8ba0affd27d3034f445a4bb453 (patch)
treec8b4b1dabddbc47829a890d74025f7889e692425 /sql
parent08ce9bfe057b6cd31e7fbca4e4e9e48edde242fb (diff)
parentee2b7db3f88f6882022a8aa71b30043ed8b40792 (diff)
downloadmariadb-git-a0fd7382bc1e4a8ba0affd27d3034f445a4bb453.tar.gz
Merge 10.0-base -> 10.0
Diffstat (limited to 'sql')
-rw-r--r--sql/item_strfunc.cc5
-rw-r--r--sql/lex.h5
-rw-r--r--sql/log.cc133
-rw-r--r--sql/log.h5
-rw-r--r--sql/log_event.cc104
-rw-r--r--sql/log_event.h28
-rw-r--r--sql/multi_range_read.cc4
-rw-r--r--sql/mysqld.cc13
-rw-r--r--sql/mysqld.h5
-rw-r--r--sql/opt_range.cc38
-rw-r--r--sql/rpl_gtid.cc313
-rw-r--r--sql/rpl_gtid.h23
-rw-r--r--sql/rpl_mi.cc15
-rw-r--r--sql/rpl_mi.h10
-rw-r--r--sql/rpl_rli.cc15
-rw-r--r--sql/rpl_rli.h8
-rw-r--r--sql/set_var.cc2
-rw-r--r--sql/share/errmsg-utf8.txt10
-rw-r--r--sql/slave.cc152
-rw-r--r--sql/sql_join_cache.cc4
-rw-r--r--sql/sql_lex.h13
-rw-r--r--sql/sql_repl.cc739
-rw-r--r--sql/sql_repl.h2
-rw-r--r--sql/sql_show.cc4
-rw-r--r--sql/sql_statistics.cc2
-rw-r--r--sql/sql_yacc.yy41
-rw-r--r--sql/sys_vars.cc111
-rw-r--r--sql/sys_vars.h108
28 files changed, 1531 insertions, 381 deletions
diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc
index e5c3c48b3bb..5dcd83681c6 100644
--- a/sql/item_strfunc.cc
+++ b/sql/item_strfunc.cc
@@ -3865,7 +3865,10 @@ bool Item_func_dyncol_create::fix_fields(THD *thd, Item **ref)
vals= (DYNAMIC_COLUMN_VALUE *) alloc_root(thd->mem_root,
sizeof(DYNAMIC_COLUMN_VALUE) *
(arg_count / 2));
- for (i= 0; i + 1 < arg_count && args[i]->result_type() == INT_RESULT; i+= 2);
+ for (i= 0;
+ i + 1 < arg_count && args[i]->result_type() == INT_RESULT;
+ i+= 2)
+ ;
if (i + 1 < arg_count)
{
names= TRUE;
diff --git a/sql/lex.h b/sql/lex.h
index c579ea84533..7edb1456e09 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -152,6 +152,7 @@ static SYMBOL symbols[] = {
{ "CROSS", SYM(CROSS)},
{ "CUBE", SYM(CUBE_SYM)},
{ "CURRENT_DATE", SYM(CURDATE)},
+ { "CURRENT_POS", SYM(CURRENT_POS_SYM)},
{ "CURRENT_TIME", SYM(CURTIME)},
{ "CURRENT_TIMESTAMP", SYM(NOW_SYM)},
{ "CURRENT_USER", SYM(CURRENT_USER)},
@@ -330,7 +331,7 @@ static SYMBOL symbols[] = {
{ "LOW_PRIORITY", SYM(LOW_PRIORITY)},
{ "MASTER", SYM(MASTER_SYM)},
{ "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)},
- { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)},
+ { "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)},
{ "MASTER_HOST", SYM(MASTER_HOST_SYM)},
{ "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)},
{ "MASTER_LOG_POS", SYM(MASTER_LOG_POS_SYM)},
@@ -347,6 +348,7 @@ static SYMBOL symbols[] = {
{ "MASTER_SSL_KEY", SYM(MASTER_SSL_KEY_SYM)},
{ "MASTER_SSL_VERIFY_SERVER_CERT", SYM(MASTER_SSL_VERIFY_SERVER_CERT_SYM)},
{ "MASTER_USER", SYM(MASTER_USER_SYM)},
+ { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)},
{ "MASTER_HEARTBEAT_PERIOD", SYM(MASTER_HEARTBEAT_PERIOD_SYM)},
{ "MATCH", SYM(MATCH)},
{ "MAX_CONNECTIONS_PER_HOUR", SYM(MAX_CONNECTIONS_PER_HOUR)},
@@ -518,6 +520,7 @@ static SYMBOL symbols[] = {
{ "SIMPLE", SYM(SIMPLE_SYM)},
{ "SLAVE", SYM(SLAVE)},
{ "SLAVES", SYM(SLAVES)},
+ { "SLAVE_POS", SYM(SLAVE_POS_SYM)},
{ "SLOW", SYM(SLOW)},
{ "SNAPSHOT", SYM(SNAPSHOT_SYM)},
{ "SMALLINT", SYM(SMALLINT)},
diff --git a/sql/log.cc b/sql/log.cc
index 48c458c2607..6726be36e74 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -119,7 +119,6 @@ static MYSQL_BIN_LOG::xid_count_per_binlog *
static bool start_binlog_background_thread();
-
static rpl_binlog_state rpl_global_gtid_binlog_state;
/**
@@ -3007,6 +3006,14 @@ void MYSQL_BIN_LOG::cleanup()
mysql_cond_destroy(&COND_binlog_background_thread);
mysql_cond_destroy(&COND_binlog_background_thread_end);
}
+
+ /*
+ Free data for global binlog state.
+ We can't do that automaticly as we need to do this before
+ safemalloc is shut down
+ */
+ if (!is_relay_log)
+ rpl_global_gtid_binlog_state.free();
DBUG_VOID_RETURN;
}
@@ -3286,7 +3293,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
there had been an entry (domain_id, server_id, 0).
*/
- Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
+ Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0);
if (gl_ev.write(&log_file))
goto err;
@@ -3847,9 +3854,6 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log)
if (!is_relay_log)
{
rpl_global_gtid_binlog_state.reset();
- mysql_mutex_lock(&LOCK_gtid_counter);
- global_gtid_counter= 0;
- mysql_mutex_unlock(&LOCK_gtid_counter);
}
/* Start logging with a new file */
@@ -5373,9 +5377,11 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
bool is_transactional)
{
rpl_gtid gtid;
- uint64 seq_no;
+ uint32 domain_id= thd->variables.gtid_domain_id;
+ uint32 server_id= thd->variables.server_id;
+ uint64 seq_no= thd->variables.gtid_seq_no;
+ int err;
- seq_no= thd->variables.gtid_seq_no;
/*
Reset the session variable gtid_seq_no, to reduce the risk of accidentally
producing a duplicate GTID.
@@ -5383,34 +5389,36 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
thd->variables.gtid_seq_no= 0;
if (seq_no != 0)
{
- /*
- If we see a higher sequence number, use that one as the basis of any
- later generated sequence numbers.
- */
- bump_seq_no_counter_if_needed(seq_no);
+ /* Use the specified sequence number. */
+ gtid.domain_id= domain_id;
+ gtid.server_id= server_id;
+ gtid.seq_no= seq_no;
+ mysql_mutex_lock(&LOCK_rpl_gtid_state);
+ err= rpl_global_gtid_binlog_state.update(&gtid, opt_gtid_strict_mode);
+ mysql_mutex_unlock(&LOCK_rpl_gtid_state);
+ if (err && thd->stmt_da->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER)
+ errno= ER_GTID_STRICT_OUT_OF_ORDER;
}
else
{
- mysql_mutex_lock(&LOCK_gtid_counter);
- seq_no= ++global_gtid_counter;
- mysql_mutex_unlock(&LOCK_gtid_counter);
+ /* Allocate the next sequence number for the GTID. */
+ mysql_mutex_lock(&LOCK_rpl_gtid_state);
+ err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id,
+ server_id, &gtid);
+ mysql_mutex_unlock(&LOCK_rpl_gtid_state);
+ seq_no= gtid.seq_no;
}
- gtid.seq_no= seq_no;
- gtid.domain_id= thd->variables.gtid_domain_id;
+ if (err)
+ return true;
- Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone,
+ Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional);
- gtid.server_id= gtid_event.server_id;
/* Write the event to the binary log. */
if (gtid_event.write(&mysql_bin_log.log_file))
return true;
status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
- /* Update the replication state (last GTID in each replication domain). */
- mysql_mutex_lock(&LOCK_rpl_gtid_state);
- rpl_global_gtid_binlog_state.update(&gtid);
- mysql_mutex_unlock(&LOCK_rpl_gtid_state);
return false;
}
@@ -5511,9 +5519,6 @@ end:
end_io_cache(&cache);
if (opened)
mysql_file_close(file_no, MYF(0));
- /* Pick the next unused seq_no from the loaded binlog state. */
- bump_seq_no_counter_if_needed(
- rpl_global_gtid_binlog_state.seq_no_from_state());
return err;
}
@@ -5527,6 +5532,18 @@ MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
bool
+MYSQL_BIN_LOG::append_state_pos(String *str)
+{
+ bool err;
+
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ err= rpl_global_gtid_binlog_state.append_pos(str);
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ return err;
+}
+
+
+bool
MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id,
rpl_gtid *out_gtid)
{
@@ -5543,33 +5560,44 @@ bool
MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id,
rpl_gtid *out_gtid)
{
- rpl_binlog_state::element *elem;
- bool res;
+ rpl_gtid *found_gtid;
+ bool res= false;
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
- elem= (rpl_binlog_state::element *)
- my_hash_search(&rpl_global_gtid_binlog_state.hash,
- (const uchar *)&domain_id, 0);
- if (elem)
+ if ((found_gtid= rpl_global_gtid_binlog_state.find_most_recent(domain_id)))
{
+ *out_gtid= *found_gtid;
res= true;
- *out_gtid= *elem->last_gtid;
}
- else
- res= false;
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return res;
}
-void
-MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no)
+int
+MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no)
{
- mysql_mutex_lock(&LOCK_gtid_counter);
- if (global_gtid_counter < seq_no)
- global_gtid_counter= seq_no;
- mysql_mutex_unlock(&LOCK_gtid_counter);
+ int err;
+
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ err= rpl_global_gtid_binlog_state.bump_seq_no_if_needed(domain_id, seq_no);
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ return err;
+}
+
+
+bool
+MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
+ uint64 seq_no)
+{
+ bool err;
+
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ err= rpl_global_gtid_binlog_state.check_strict_sequence(domain_id, server_id,
+ seq_no);
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ return err;
}
@@ -5642,7 +5670,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id;
- write_gtid_event(thd, true, using_trans);
+ if (write_gtid_event(thd, true, using_trans))
+ goto err;
}
else
{
@@ -6711,8 +6740,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty());
- current->error= write_transaction_or_stmt(current);
-
+ if ((current->error= write_transaction_or_stmt(current)))
+ current->commit_errno= errno;
strmake(cache_mngr->last_commit_pos_file, log_file_name,
sizeof(cache_mngr->last_commit_pos_file)-1);
commit_offset= my_b_write_tell(&log_file);
@@ -8257,9 +8286,6 @@ int TC_LOG_BINLOG::open(const char *opt_name)
error= recover(&log_info, log_name, &log,
(Format_description_log_event *)ev);
state_read= true;
- /* Pick the next unused seq_no from the recovered binlog state. */
- bump_seq_no_counter_if_needed(
- rpl_global_gtid_binlog_state.seq_no_from_state());
}
else
error= read_state_from_file();
@@ -8513,7 +8539,7 @@ binlog_background_thread(void *arg __attribute__((unused)))
thd->store_globals();
/*
- Load the slave replication GTID state from the mysql.rpl_slave_state
+ Load the slave replication GTID state from the mysql.gtid_slave_pos
table.
This is mostly so that we can start our seq_no counter from the highest
@@ -8723,16 +8749,11 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
case GTID_LIST_EVENT:
if (first_round)
{
- uint32 i;
Gtid_list_log_event *glev= (Gtid_list_log_event *)ev;
/* Initialise the binlog state from the Gtid_list event. */
- rpl_global_gtid_binlog_state.reset();
- for (i= 0; i < glev->count; ++i)
- {
- if (rpl_global_gtid_binlog_state.update(&(glev->list[i])))
- goto err2;
- }
+ if (rpl_global_gtid_binlog_state.load(glev->list, glev->count))
+ goto err2;
}
break;
@@ -8746,7 +8767,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
gtid.domain_id= gev->domain_id;
gtid.server_id= gev->server_id;
gtid.seq_no= gev->seq_no;
- if (rpl_global_gtid_binlog_state.update(&gtid))
+ if (rpl_global_gtid_binlog_state.update(&gtid, false))
goto err2;
}
break;
diff --git a/sql/log.h b/sql/log.h
index bd20c8aee09..018ac64eff7 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -779,10 +779,13 @@ public:
int read_state_from_file();
int write_state_to_file();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
+ bool append_state_pos(String *str);
bool find_in_binlog_state(uint32 domain_id, uint32 server_id,
rpl_gtid *out_gtid);
bool lookup_domain_in_binlog_state(uint32 domain_id, rpl_gtid *out_gtid);
- void bump_seq_no_counter_if_needed(uint64 seq_no);
+ int bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no);
+ bool check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
+ uint64 seq_no);
};
class Log_event_handler
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 3eddd8bf2eb..fce0130e8dd 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -4004,7 +4004,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
gtid= rli->current_gtid;
- if (rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true))
+ if (rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false))
{
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
"Error during COMMIT: failed to update GTID state in "
@@ -6214,6 +6214,15 @@ Gtid_log_event::do_apply_event(Relay_log_info const *rli)
thd->variables.gtid_domain_id= this->domain_id;
thd->variables.gtid_seq_no= this->seq_no;
+ if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
+ {
+ /* Need to reset prior "ok" status to give an error. */
+ thd->clear_error();
+ thd->stmt_da->reset_diagnostics_area();
+ if (mysql_bin_log.check_strict_gtid_sequence(this->domain_id,
+ this->server_id, this->seq_no))
+ return 1;
+ }
if (flags2 & FL_STANDALONE)
return 0;
@@ -6320,6 +6329,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
: Log_event(buf, description_event), count(0), list(0)
{
uint32 i;
+ uint32 val;
uint8 header_size= description_event->common_header_len;
uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1];
if (event_len < header_size + post_header_len ||
@@ -6327,7 +6337,9 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
return;
buf+= header_size;
- count= uint4korr(buf) & ((1<<28)-1);
+ val= uint4korr(buf);
+ count= val & ((1<<28)-1);
+ gl_flags= val & ((uint32)0xf << 28);
buf+= 4;
if (event_len - (header_size + post_header_len) < count*element_size ||
(!(list= (rpl_gtid *)my_malloc(count*sizeof(*list) + (count == 0),
@@ -6348,8 +6360,9 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
#ifdef MYSQL_SERVER
-Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
- : count(gtid_set->count()), list(0)
+Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set,
+ uint32 gl_flags_)
+ : count(gtid_set->count()), gl_flags(gl_flags_), list(0)
{
cache_type= EVENT_NO_CACHE;
/* Failure to allocate memory will be caught by is_valid() returning false. */
@@ -6359,32 +6372,73 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
gtid_set->get_gtid_list(list, count);
}
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
bool
-Gtid_list_log_event::write(IO_CACHE *file)
+Gtid_list_log_event::to_packet(String *packet)
{
uint32 i;
- uchar buf[element_size];
+ uchar *p;
+ uint32 needed_length;
DBUG_ASSERT(count < 1<<28);
- if (write_header(file, get_data_size()))
- return 1;
- int4store(buf, count & ((1<<28)-1));
- if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN))
- return 1;
+ needed_length= packet->length() + get_data_size();
+ if (packet->reserve(needed_length))
+ return true;
+ p= (uchar *)packet->ptr() + packet->length();;
+ packet->length(needed_length);
+ int4store(p, (count & ((1<<28)-1)) | gl_flags);
+ p += 4;
+ /* Initialise the padding for empty Gtid_list. */
+ if (count == 0)
+ int2store(p, 0);
for (i= 0; i < count; ++i)
{
- int4store(buf, list[i].domain_id);
- int4store(buf+4, list[i].server_id);
- int8store(buf+8, list[i].seq_no);
- if (wrapper_my_b_safe_write(file, buf, element_size))
- return 1;
+ int4store(p, list[i].domain_id);
+ int4store(p+4, list[i].server_id);
+ int8store(p+8, list[i].seq_no);
+ p += 16;
}
- return write_footer(file);
+
+ return false;
+}
+
+
+bool
+Gtid_list_log_event::write(IO_CACHE *file)
+{
+ char buf[128];
+ String packet(buf, sizeof(buf), system_charset_info);
+
+ packet.length(0);
+ if (to_packet(&packet))
+ return true;
+ return
+ write_header(file, get_data_size()) ||
+ wrapper_my_b_safe_write(file, (uchar *)packet.ptr(), packet.length()) ||
+ write_footer(file);
+}
+
+
+int
+Gtid_list_log_event::do_apply_event(Relay_log_info const *rli)
+{
+ int ret= Log_event::do_apply_event(rli);
+ if (rli->until_condition == Relay_log_info::UNTIL_GTID &&
+ (gl_flags & FLAG_UNTIL_REACHED))
+ {
+ char str_buf[128];
+ String str(str_buf, sizeof(str_buf), system_charset_info);
+ const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str);
+ sql_print_information("Slave SQL thread stops because it reached its"
+ " UNTIL master_gtid_pos %s", str.c_ptr_safe());
+ const_cast<Relay_log_info*>(rli)->abort_slave= true;
+ }
+ return ret;
}
-#ifdef HAVE_REPLICATION
void
Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol)
{
@@ -6439,12 +6493,24 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
*/
bool
Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
+ uint8 checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len)
{
const char *p;
uint32 count_field, count;
rpl_gtid *gtid_list;
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ if (event_len > BINLOG_CHECKSUM_LEN)
+ event_len-= BINLOG_CHECKSUM_LEN;
+ else
+ event_len= 0;
+ }
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
return true;
p= event_start + LOG_EVENT_HEADER_LEN;
@@ -6841,7 +6907,7 @@ int Xid_log_event::do_apply_event(Relay_log_info const *rli)
const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
gtid= rli->current_gtid;
- err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true);
+ err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false);
if (err)
{
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
diff --git a/sql/log_event.h b/sql/log_event.h
index 5026b280b27..b5b488f320d 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3116,7 +3116,7 @@ public:
<td>count</td>
<td>4 byte unsigned integer</td>
<td>The lower 28 bits are the number of GTIDs. The upper 4 bits are
- reserved for flags bits for future expansion</td>
+ flags bits.</td>
</tr>
</table>
@@ -3149,18 +3149,28 @@ public:
</table>
The three elements in the body repeat COUNT times to form the GTID list.
+
+ At the time of writing, only one flag bit is in use.
+
+ Bit 28 of `count' is used for flag FLAG_UNTIL_REACHED, which is sent in a
+ Gtid_list event from the master to the slave to indicate that the START
+ SLAVE UNTIL master_gtid_pos=xxx condition has been reached. (This flag is
+ only sent in "fake" events generated on the fly, it is not written into
+ the binlog).
*/
class Gtid_list_log_event: public Log_event
{
public:
uint32 count;
+ uint32 gl_flags;
struct rpl_gtid *list;
static const uint element_size= 4+4+8;
+ static const uint32 FLAG_UNTIL_REACHED= (1<<28);
#ifdef MYSQL_SERVER
- Gtid_list_log_event(rpl_binlog_state *gtid_set);
+ Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags);
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
#endif
@@ -3171,12 +3181,22 @@ public:
const Format_description_log_event *description_event);
~Gtid_list_log_event() { my_free(list); }
Log_event_type get_type_code() { return GTID_LIST_EVENT; }
- int get_data_size() { return GTID_LIST_HEADER_LEN + count*element_size; }
+ int get_data_size() {
+ /*
+ Replacing with dummy event, needed for older slaves, requires a minimum
+ of 6 bytes in the body.
+ */
+ return (count==0 ?
+ GTID_LIST_HEADER_LEN+2 : GTID_LIST_HEADER_LEN+count*element_size);
+ }
bool is_valid() const { return list != NULL; }
-#ifdef MYSQL_SERVER
+#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
+ bool to_packet(String *packet);
bool write(IO_CACHE *file);
+ virtual int do_apply_event(Relay_log_info const *rli);
#endif
static bool peek(const char *event_start, uint32 event_len,
+ uint8 checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len);
};
diff --git a/sql/multi_range_read.cc b/sql/multi_range_read.cc
index b6133eac3ae..3ea65b16e3d 100644
--- a/sql/multi_range_read.cc
+++ b/sql/multi_range_read.cc
@@ -1199,9 +1199,9 @@ bool DsMrr_impl::setup_buffer_sharing(uint key_size_in_keybuf,
statistics?
*/
uint parts= my_count_bits(key_tuple_map);
- ulong rpc;
+ ha_rows rpc;
ulonglong rowids_size= rowid_buf_elem_size;
- if ((rpc= key_info->actual_rec_per_key(parts - 1)))
+ if ((rpc= (ha_rows) key_info->actual_rec_per_key(parts - 1)))
rowids_size= rowid_buf_elem_size * rpc;
double fraction_for_rowids=
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index fd8b990b6c3..0a49eb0c7ee 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -681,7 +681,7 @@ mysql_mutex_t
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
-mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state;
+mysql_mutex_t LOCK_rpl_gtid_state;
/**
The below lock protects access to two global server variables:
@@ -850,7 +850,7 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready;
-PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state;
+PSI_mutex_key key_LOCK_rpl_gtid_state;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
@@ -895,7 +895,6 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
- { &key_LOCK_gtid_counter, "LOCK_gtid_counter", PSI_FLAG_GLOBAL},
{ &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
@@ -1394,10 +1393,7 @@ struct st_VioSSLFd *ssl_acceptor_fd;
*/
uint connection_count= 0, extra_connection_count= 0;
-/**
- Running counter for generating new GTIDs locally.
-*/
-uint64 global_gtid_counter= 0;
+my_bool opt_gtid_strict_mode= FALSE;
/* Function declarations */
@@ -2073,7 +2069,6 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_global_user_client_stats);
mysql_mutex_destroy(&LOCK_global_table_stats);
mysql_mutex_destroy(&LOCK_global_index_stats);
- mysql_mutex_destroy(&LOCK_gtid_counter);
mysql_mutex_destroy(&LOCK_rpl_gtid_state);
#ifdef HAVE_OPENSSL
mysql_mutex_destroy(&LOCK_des_key_file);
@@ -4250,8 +4245,6 @@ static int init_thread_environment()
&LOCK_global_table_stats, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_global_index_stats,
&LOCK_global_index_stats, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_LOCK_gtid_counter,
- &LOCK_gtid_counter, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_rpl_gtid_state,
&LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered,
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 0717bcff718..e07b5d5c41c 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -255,7 +255,7 @@ extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
-extern PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state;
+extern PSI_mutex_key key_LOCK_rpl_gtid_state;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -476,7 +476,7 @@ extern mysql_mutex_t
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
-extern mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state;
+extern mysql_mutex_t LOCK_rpl_gtid_state;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL
extern mysql_mutex_t LOCK_des_key_file;
@@ -677,6 +677,7 @@ extern handlerton *maria_hton;
extern uint extra_connection_count;
extern uint64 global_gtid_counter;
+extern my_bool opt_gtid_strict_mode;
extern my_bool opt_userstat_running, debug_assert_if_crashed_table;
extern uint mysqld_extra_port;
extern ulong opt_progress_report_time;
diff --git a/sql/opt_range.cc b/sql/opt_range.cc
index f20e3ed7739..37a7944bbaf 100644
--- a/sql/opt_range.cc
+++ b/sql/opt_range.cc
@@ -5919,8 +5919,8 @@ ha_rows records_in_index_intersect_extension(PARTIAL_INDEX_INTERSECT_INFO *curr,
ha_rows ext_records= ext_index_scan->records;
if (i < used_key_parts)
{
- ulong f1= key_info->actual_rec_per_key(i-1);
- ulong f2= key_info->actual_rec_per_key(i);
+ double f1= key_info->actual_rec_per_key(i-1);
+ double f2= key_info->actual_rec_per_key(i);
ext_records= (ha_rows) ((double) ext_records / f2 * f1);
}
if (ext_records < table_cardinality)
@@ -13157,11 +13157,11 @@ void cost_group_min_max(TABLE* table, KEY *index_info, uint used_key_parts,
double *read_cost, ha_rows *records)
{
ha_rows table_records;
- uint num_groups;
- uint num_blocks;
- uint keys_per_block;
- uint keys_per_group;
- uint keys_per_subgroup; /* Average number of keys in sub-groups */
+ ha_rows num_groups;
+ ha_rows num_blocks;
+ uint keys_per_block;
+ ha_rows keys_per_group;
+ ha_rows keys_per_subgroup; /* Average number of keys in sub-groups */
/* formed by a key infix. */
double p_overlap; /* Probability that a sub-group overlaps two blocks. */
double quick_prefix_selectivity;
@@ -13170,24 +13170,24 @@ void cost_group_min_max(TABLE* table, KEY *index_info, uint used_key_parts,
DBUG_ENTER("cost_group_min_max");
table_records= table->stat_records();
- keys_per_block= (table->file->stats.block_size / 2 /
- (index_info->key_length + table->file->ref_length)
- + 1);
- num_blocks= (uint)(table_records / keys_per_block) + 1;
+ keys_per_block= (uint) (table->file->stats.block_size / 2 /
+ (index_info->key_length + table->file->ref_length)
+ + 1);
+ num_blocks= (ha_rows)(table_records / keys_per_block) + 1;
/* Compute the number of keys in a group. */
- keys_per_group= index_info->actual_rec_per_key(group_key_parts - 1);
+ keys_per_group= (ha_rows) index_info->actual_rec_per_key(group_key_parts - 1);
if (keys_per_group == 0) /* If there is no statistics try to guess */
/* each group contains 10% of all records */
- keys_per_group= (uint)(table_records / 10) + 1;
- num_groups= (uint)(table_records / keys_per_group) + 1;
+ keys_per_group= (table_records / 10) + 1;
+ num_groups= (table_records / keys_per_group) + 1;
/* Apply the selectivity of the quick select for group prefixes. */
if (range_tree && (quick_prefix_records != HA_POS_ERROR))
{
quick_prefix_selectivity= (double) quick_prefix_records /
(double) table_records;
- num_groups= (uint) rint(num_groups * quick_prefix_selectivity);
+ num_groups= (ha_rows) rint(num_groups * quick_prefix_selectivity);
set_if_bigger(num_groups, 1);
}
@@ -13196,7 +13196,7 @@ void cost_group_min_max(TABLE* table, KEY *index_info, uint used_key_parts,
Compute the probability that two ends of a subgroup are inside
different blocks.
*/
- keys_per_subgroup= index_info->actual_rec_per_key(used_key_parts - 1);
+ keys_per_subgroup= (ha_rows) index_info->actual_rec_per_key(used_key_parts - 1);
if (keys_per_subgroup >= keys_per_block) /* If a subgroup is bigger than */
p_overlap= 1.0; /* a block, it will overlap at least two blocks. */
else
@@ -13224,9 +13224,9 @@ void cost_group_min_max(TABLE* table, KEY *index_info, uint used_key_parts,
*records= num_groups;
DBUG_PRINT("info",
- ("table rows: %lu keys/block: %u keys/group: %u result rows: %lu blocks: %u",
- (ulong)table_records, keys_per_block, keys_per_group,
- (ulong) *records, num_blocks));
+ ("table rows: %lu keys/block: %u keys/group: %lu result rows: %lu blocks: %lu",
+ (ulong)table_records, keys_per_block, (ulong) keys_per_group,
+ (ulong) *records, (ulong) num_blocks));
DBUG_VOID_RETURN;
}
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index d6a6ed90bd3..b34b890060b 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -29,7 +29,7 @@
const LEX_STRING rpl_gtid_slave_state_table_name=
- { C_STRING_WITH_LEN("rpl_slave_state") };
+ { C_STRING_WITH_LEN("gtid_slave_pos") };
void
@@ -73,7 +73,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
if ((sub_id= rli->gtid_sub_id))
{
rli->gtid_sub_id= 0;
- if (record_gtid(thd, &rli->current_gtid, sub_id, false))
+ if (record_gtid(thd, &rli->current_gtid, sub_id, false, false))
return 1;
update_state_hash(sub_id, &rli->current_gtid);
}
@@ -186,8 +186,6 @@ rpl_slave_state::truncate_state_table(THD *thd)
int err= 0;
TABLE *table;
- mysql_reset_thd_for_next_command(thd, 0);
-
tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length,
@@ -234,7 +232,7 @@ static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
-static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= {
+static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
array_elements(mysql_rpl_slave_state_coltypes),
mysql_rpl_slave_state_coltypes,
array_elements(mysql_rpl_slave_state_pk_parts),
@@ -256,14 +254,14 @@ protected:
static Gtid_db_intact gtid_table_intact;
/*
- Check that the mysql.rpl_slave_state table has the correct definition.
+ Check that the mysql.gtid_slave_pos table has the correct definition.
*/
int
gtid_check_rpl_slave_state_table(TABLE *table)
{
int err;
- if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef)))
+ if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
rpl_gtid_slave_state_table_name.str);
return err;
@@ -286,7 +284,7 @@ gtid_check_rpl_slave_state_table(TABLE *table)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction)
+ bool in_transaction, bool in_statement)
{
TABLE_LIST tlist;
int err= 0;
@@ -297,7 +295,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
- mysql_reset_thd_for_next_command(thd, 0);
+ if (!in_statement)
+ mysql_reset_thd_for_next_command(thd, 0);
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{
@@ -371,7 +370,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
}
table->file->ha_index_end();
- mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no);
+ if(!err && opt_bin_log &&
+ (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
+ gtid->seq_no)))
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
end:
@@ -626,7 +628,7 @@ gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid)
*/
int
rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
- bool reset)
+ bool reset, bool in_statement)
{
char *end= state_from_master + len;
@@ -645,7 +647,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_subid(gtid.domain_id)) ||
- record_gtid(thd, &gtid, sub_id, false) ||
+ record_gtid(thd, &gtid, sub_id, false, in_statement) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
return 1;
if (state_from_master == end)
@@ -686,6 +688,7 @@ rpl_binlog_state::rpl_binlog_state()
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
MY_MUTEX_INIT_SLOW);
+ initialized= 1;
}
@@ -699,11 +702,36 @@ rpl_binlog_state::reset()
my_hash_reset(&hash);
}
-rpl_binlog_state::~rpl_binlog_state()
+void rpl_binlog_state::free()
+{
+ if (initialized)
+ {
+ initialized= 0;
+ reset();
+ my_hash_free(&hash);
+ mysql_mutex_destroy(&LOCK_binlog_state);
+ }
+}
+
+
+bool
+rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
{
+ uint32 i;
+
reset();
- my_hash_free(&hash);
- mysql_mutex_destroy(&LOCK_binlog_state);
+ for (i= 0; i < count; ++i)
+ {
+ if (update(&(list[i]), false))
+ return true;
+ }
+ return false;
+}
+
+
+rpl_binlog_state::~rpl_binlog_state()
+{
+ free();
}
@@ -716,48 +744,111 @@ rpl_binlog_state::~rpl_binlog_state()
Returns 0 for ok, 1 for error.
*/
int
-rpl_binlog_state::update(const struct rpl_gtid *gtid)
+rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
{
- rpl_gtid *lookup_gtid;
element *elem;
- elem= (element *)my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
- if (elem)
+ if ((elem= (element *)my_hash_search(&hash,
+ (const uchar *)(&gtid->domain_id), 0)))
{
- /*
- By far the most common case is that successive events within same
- replication domain have the same server id (it changes only when
- switching to a new master). So save a hash lookup in this case.
- */
- if (likely(elem->last_gtid->server_id == gtid->server_id))
+ if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
{
- elem->last_gtid->seq_no= gtid->seq_no;
- return 0;
+ my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
+ gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
+ elem->last_gtid->server_id, elem->last_gtid->seq_no);
+ return 1;
}
+ if (elem->seq_no_counter < gtid->seq_no)
+ elem->seq_no_counter= gtid->seq_no;
+ if (!elem->update_element(gtid))
+ return 0;
+ }
+ else if (!alloc_element(gtid))
+ return 0;
- lookup_gtid= (rpl_gtid *)
- my_hash_search(&elem->hash, (const uchar *)&gtid->server_id, 0);
- if (lookup_gtid)
- {
- lookup_gtid->seq_no= gtid->seq_no;
- elem->last_gtid= lookup_gtid;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+}
+
+
+/*
+ Fill in a new GTID, allocating next sequence number, and update state
+ accordingly.
+*/
+int
+rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
+ rpl_gtid *gtid)
+{
+ element *elem;
+
+ gtid->domain_id= domain_id;
+ gtid->server_id= server_id;
+
+ if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
+ {
+ gtid->seq_no= ++elem->seq_no_counter;
+ if (!elem->update_element(gtid))
return 0;
- }
+ }
+ else
+ {
+ gtid->seq_no= 1;
+ if (!alloc_element(gtid))
+ return 0;
+ }
- /* Allocate a new GTID and insert it. */
- lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
- if (!lookup_gtid)
- return 1;
- memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
- if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
- {
- my_free(lookup_gtid);
- return 1;
- }
- elem->last_gtid= lookup_gtid;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+}
+
+
+/* Helper functions for update. */
+int
+rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
+{
+ rpl_gtid *lookup_gtid;
+
+ /*
+ By far the most common case is that successive events within same
+ replication domain have the same server id (it changes only when
+ switching to a new master). So save a hash lookup in this case.
+ */
+ if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
+ {
+ last_gtid->seq_no= gtid->seq_no;
return 0;
}
+ lookup_gtid= (rpl_gtid *)
+ my_hash_search(&hash, (const uchar *)&gtid->server_id, 0);
+ if (lookup_gtid)
+ {
+ lookup_gtid->seq_no= gtid->seq_no;
+ last_gtid= lookup_gtid;
+ return 0;
+ }
+
+ /* Allocate a new GTID and insert it. */
+ lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
+ if (!lookup_gtid)
+ return 1;
+ memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
+ if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
+ {
+ my_free(lookup_gtid);
+ return 1;
+ }
+ last_gtid= lookup_gtid;
+ return 0;
+}
+
+
+int
+rpl_binlog_state::alloc_element(const rpl_gtid *gtid)
+{
+ element *elem;
+ rpl_gtid *lookup_gtid;
+
/* First time we see this domain_id; allocate a new element. */
elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
@@ -768,6 +859,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
HASH_UNIQUE);
elem->last_gtid= lookup_gtid;
+ elem->seq_no_counter= gtid->seq_no;
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
{
@@ -787,23 +879,64 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
}
-uint64
-rpl_binlog_state::seq_no_from_state()
+/*
+ Check that a new GTID can be logged without creating an out-of-order
+ sequence number with existing GTIDs.
+*/
+bool
+rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
+ uint64 seq_no)
{
- ulong i, j;
- uint64 seq_no= 0;
+ element *elem;
- for (i= 0; i < hash.records; ++i)
+ if ((elem= (element *)my_hash_search(&hash,
+ (const uchar *)(&domain_id), 0)) &&
+ elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
{
- element *e= (element *)my_hash_element(&hash, i);
- for (j= 0; j < e->hash.records; ++j)
- {
- const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
- if (gtid->seq_no > seq_no)
- seq_no= gtid->seq_no;
- }
+ my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
+ elem->last_gtid->domain_id, elem->last_gtid->server_id,
+ elem->last_gtid->seq_no);
+ return 1;
}
- return seq_no;
+ return 0;
+}
+
+
+/*
+ When we see a new GTID that will not be binlogged (eg. slave thread
+ with --log-slave-updates=0), then we need to remember to allocate any
+ GTID seq_no of our own within that domain starting from there.
+
+ Returns 0 if ok, non-zero if out-of-memory.
+*/
+int
+rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
+{
+ element *elem;
+
+ if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
+ {
+ if (elem->seq_no_counter < seq_no)
+ elem->seq_no_counter= seq_no;
+ return 0;
+ }
+
+ /* We need to allocate a new, empty element to remember the next seq_no. */
+ if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
+ return 1;
+
+ elem->domain_id= domain_id;
+ my_hash_init(&elem->hash, &my_charset_bin, 32,
+ offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+ elem->last_gtid= NULL;
+ elem->seq_no_counter= seq_no;
+ if (0 == my_hash_insert(&hash, (const uchar *)elem))
+ return 0;
+
+ my_hash_free(&elem->hash);
+ my_free(elem);
+ return 1;
}
@@ -824,6 +957,11 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
{
size_t res;
element *e= (element *)my_hash_element(&hash, i);
+ if (!e->last_gtid)
+ {
+ DBUG_ASSERT(e->hash.records == 0);
+ continue;
+ }
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
@@ -865,7 +1003,7 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src)
end= buf + res;
if (gtid_parser_helper(&p, end, &gtid))
return 1;
- if (update(&gtid))
+ if (update(&gtid, false))
return 1;
}
return 0;
@@ -881,6 +1019,17 @@ rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
}
+rpl_gtid *
+rpl_binlog_state::find_most_recent(uint32 domain_id)
+{
+ element *elem;
+
+ elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
+ if (elem && elem->last_gtid)
+ return elem->last_gtid;
+ return NULL;
+}
+
uint32
rpl_binlog_state::count()
@@ -904,6 +1053,11 @@ rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
+ if (!e->last_gtid)
+ {
+ DBUG_ASSERT(e->hash.records==0);
+ continue;
+ }
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
@@ -940,20 +1094,44 @@ int
rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
{
uint32 i;
+ uint32 alloc_size, out_size;
- *size= hash.records;
- if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME))))
+ alloc_size= hash.records;
+ if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid),
+ MYF(MY_WME))))
return 1;
- for (i= 0; i < *size; ++i)
+ out_size= 0;
+ for (i= 0; i < alloc_size; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
- memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid));
+ if (!e->last_gtid)
+ continue;
+ memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
}
+ *size= out_size;
return 0;
}
+bool
+rpl_binlog_state::append_pos(String *str)
+{
+ uint32 i;
+ bool first= true;
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ if (e->last_gtid &&
+ rpl_slave_state_tostring_helper(str, e->last_gtid, &first))
+ return true;
+ }
+
+ return false;
+}
+
+
slave_connection_state::slave_connection_state()
{
my_hash_init(&hash, &my_charset_bin, 32,
@@ -1107,10 +1285,17 @@ slave_connection_state::remove(const rpl_gtid *in_gtid)
int
slave_connection_state::to_string(String *out_str)
{
+ out_str->length(0);
+ return append_to_string(out_str);
+}
+
+
+int
+slave_connection_state::append_to_string(String *out_str)
+{
uint32 i;
bool first;
- out_str->length(0);
first= true;
for (i= 0; i < hash.records; ++i)
{
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index e63d8439803..fefce684c2c 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -91,11 +91,12 @@ struct rpl_slave_state
int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction);
+ bool in_transaction, bool in_statement);
uint64 next_subid(uint32 domain_id);
int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid);
- int load(THD *thd, char *state_from_master, size_t len, bool reset);
+ int load(THD *thd, char *state_from_master, size_t len, bool reset,
+ bool in_statement);
bool is_empty();
void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
@@ -130,24 +131,37 @@ struct rpl_binlog_state
HASH hash; /* Containing all server_id for one domain_id */
/* The most recent entry in the hash. */
rpl_gtid *last_gtid;
+ /* Counter to allocate next seq_no for this domain. */
+ uint64 seq_no_counter;
+
+ int update_element(const rpl_gtid *gtid);
};
/* Mapping from domain_id to collection of elements. */
HASH hash;
/* Mutex protecting access to the state. */
mysql_mutex_t LOCK_binlog_state;
+ my_bool initialized;
rpl_binlog_state();
~rpl_binlog_state();
void reset();
- int update(const struct rpl_gtid *gtid);
- uint64 seq_no_from_state();
+ void free();
+ bool load(struct rpl_gtid *list, uint32 count);
+ int update(const struct rpl_gtid *gtid, bool strict);
+ int update_with_next_gtid(uint32 domain_id, uint32 server_id,
+ rpl_gtid *gtid);
+ int alloc_element(const rpl_gtid *gtid);
+ bool check_strict_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no);
+ int bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no);
int write_to_iocache(IO_CACHE *dest);
int read_from_iocache(IO_CACHE *src);
uint32 count();
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
+ bool append_pos(String *str);
rpl_gtid *find(uint32 domain_id, uint32 server_id);
+ rpl_gtid *find_most_recent(uint32 domain_id);
};
@@ -170,6 +184,7 @@ struct slave_connection_state
void remove(const rpl_gtid *gtid);
ulong count() const { return hash.records; }
int to_string(String *out_str);
+ int append_to_string(String *out_str);
};
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index d7ef562a5ff..a68fd055d74 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -37,7 +37,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF),
connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0),
slave_running(0), slave_run_id(0), sync_counter(0),
- heartbeat_period(0), received_heartbeats(0), master_id(0), using_gtid(0)
+ heartbeat_period(0), received_heartbeats(0), master_id(0),
+ using_gtid(USE_GTID_NO)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
@@ -153,7 +154,7 @@ void init_master_log_pos(Master_info* mi)
mi->master_log_name[0] = 0;
mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number
- mi->using_gtid= false;
+ mi->using_gtid= Master_info::USE_GTID_NO;
/* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0;
@@ -481,7 +482,15 @@ file '%s')", fname);
while (!init_strvar_from_file(buf, sizeof(buf), &mi->file, 0))
{
if (0 == strncmp(buf, STRING_WITH_LEN("using_gtid=")))
- mi->using_gtid= (0 != atoi(buf + sizeof("using_gtid")));
+ {
+ int val= atoi(buf + sizeof("using_gtid"));
+ if (val == Master_info::USE_GTID_CURRENT_POS)
+ mi->using_gtid= Master_info::USE_GTID_CURRENT_POS;
+ else if (val == Master_info::USE_GTID_SLAVE_POS)
+ mi->using_gtid= Master_info::USE_GTID_SLAVE_POS;
+ else
+ mi->using_gtid= Master_info::USE_GTID_NO;
+ }
}
}
}
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 9958f9a5cea..7e3709993ed 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -131,10 +131,14 @@ class Master_info : public Slave_reporting_capability
DYNAMIC_ARRAY ignore_server_ids;
ulong master_id;
/*
- True if slave position is set using GTID state rather than old-style
- file/offset binlog position.
+ Which kind of GTID position (if any) is used when connecting to master.
+
+ Note that you can not change the numeric values of these, they are used
+ in master.info.
*/
- bool using_gtid;
+ enum {
+ USE_GTID_NO= 0, USE_GTID_CURRENT_POS= 1, USE_GTID_SLAVE_POS= 2
+ } using_gtid;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
const char* slave_info_fname,
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 6add94580a3..543c6d9fffe 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1097,7 +1097,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
ulonglong log_pos;
DBUG_ENTER("Relay_log_info::is_until_satisfied");
- DBUG_ASSERT(until_condition != UNTIL_NONE);
+ DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
+ until_condition == UNTIL_RELAY_POS);
if (until_condition == UNTIL_MASTER_POS)
{
@@ -1397,7 +1398,6 @@ rpl_load_gtid_slave_state(THD *thd)
HASH hash;
int err= 0;
uint32 i;
- uint64 highest_seq_no= 0;
DBUG_ENTER("rpl_load_gtid_slave_state");
rpl_global_gtid_slave_state.lock();
@@ -1450,8 +1450,6 @@ rpl_load_gtid_slave_state(THD *thd)
DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
(unsigned)domain_id, (unsigned)server_id,
(ulong)seq_no, (ulong)sub_id));
- if (seq_no > highest_seq_no)
- highest_seq_no= seq_no;
if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
{
@@ -1495,6 +1493,14 @@ rpl_load_gtid_slave_state(THD *thd)
rpl_global_gtid_slave_state.unlock();
goto end;
}
+ if (opt_bin_log &&
+ mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
+ entry->gtid.seq_no))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ rpl_global_gtid_slave_state.unlock();
+ goto end;
+ }
}
rpl_global_gtid_slave_state.loaded= true;
rpl_global_gtid_slave_state.unlock();
@@ -1514,7 +1520,6 @@ end:
thd->mdl_context.release_transactional_locks();
}
my_hash_free(&hash);
- mysql_bin_log.bump_seq_no_counter_if_needed(highest_seq_no);
DBUG_RETURN(err);
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 7aff6720aac..6dd757343fd 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -263,7 +263,9 @@ public:
thread is running).
*/
- enum {UNTIL_NONE= 0, UNTIL_MASTER_POS, UNTIL_RELAY_POS} until_condition;
+ enum {
+ UNTIL_NONE= 0, UNTIL_MASTER_POS, UNTIL_RELAY_POS, UNTIL_GTID
+ } until_condition;
char until_log_name[FN_REFLEN];
ulonglong until_log_pos;
/* extension extracted from log_name and converted to int */
@@ -277,6 +279,8 @@ public:
UNTIL_LOG_NAMES_CMP_UNKNOWN= -2, UNTIL_LOG_NAMES_CMP_LESS= -1,
UNTIL_LOG_NAMES_CMP_EQUAL= 0, UNTIL_LOG_NAMES_CMP_GREATER= 1
} until_log_names_cmp_result;
+ /* Condition for UNTIL master_gtid_pos. */
+ slave_connection_state until_gtid_pos;
char cached_charset[6];
/*
@@ -354,6 +358,8 @@ public:
bool is_until_satisfied(THD *thd, Log_event *ev);
inline ulonglong until_pos()
{
+ DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
+ until_condition == UNTIL_RELAY_POS);
return ((until_condition == UNTIL_MASTER_POS) ? group_master_log_pos :
group_relay_log_pos);
}
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 18d86a13998..363f03579ef 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -311,7 +311,7 @@ longlong sys_var::val_int(bool *is_null,
{
case_get_string_as_lex_string;
case_for_integers(return val);
- case_for_double(return val);
+ case_for_double(return (longlong) val);
default:
my_error(ER_VAR_CANT_BE_READ, MYF(0), name.str);
return 0;
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 95caf1f43e5..ad055273155 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6762,6 +6762,12 @@ ER_GTID_POSITION_NOT_FOUND_IN_BINLOG
ER_CANNOT_LOAD_SLAVE_GTID_STATE
eng "Failed to load replication slave GTID state from table %s.%s"
ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG
- eng "Requested GTID_POS %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog"
+ eng "Specified GTID %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos."
ER_MASTER_GTID_POS_MISSING_DOMAIN
- eng "Requested GTID_POS contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog"
+ eng "Specified value for @@gtid_slave_pos contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos."
+ER_UNTIL_REQUIRES_USING_GTID
+ eng "START SLAVE UNTIL master_gtid_pos requires that slave is using GTID"
+ER_GTID_STRICT_OUT_OF_ORDER
+ eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled."
+ER_GTID_START_FROM_BINLOG_HOLE
+ eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled"
diff --git a/sql/slave.cc b/sql/slave.cc
index 407b4a73a72..c7f4dc08096 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -824,7 +824,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
while one of the threads is running, they are in use and cannot be
removed.
*/
- if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running)
+ if (mi->using_gtid != Master_info::USE_GTID_NO &&
+ !mi->slave_running && !mi->rli.slave_running)
{
purge_relay_logs(&mi->rli, NULL, 0, &errmsg);
mi->master_log_name[0]= 0;
@@ -1833,12 +1834,12 @@ after_set_capability:
restart or reconnect, we might end up re-fetching and hence re-applying
the same event(s) again.
*/
- if (mi->using_gtid && !mi->master_log_name[0])
+ if (mi->using_gtid != Master_info::USE_GTID_NO && !mi->master_log_name[0])
{
int rc;
char str_buf[256];
- String connect_state(str_buf, sizeof(str_buf), system_charset_info);
- connect_state.length(0);
+ String query_str(str_buf, sizeof(str_buf), system_charset_info);
+ query_str.length(0);
/*
Read the master @@GLOBAL.gtid_domain_id variable.
@@ -1861,9 +1862,11 @@ after_set_capability:
mysql_free_result(master_res);
master_res= NULL;
- connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"),
- system_charset_info);
- if (rpl_append_gtid_state(&connect_state, true))
+ query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"),
+ system_charset_info);
+ if (rpl_append_gtid_state(&query_str,
+ mi->using_gtid ==
+ Master_info::USE_GTID_CURRENT_POS))
{
err_code= ER_OUTOFMEMORY;
errmsg= "The slave I/O thread stops because a fatal out-of-memory "
@@ -1871,9 +1874,9 @@ after_set_capability:
sprintf(err_buff, "%s Error: Out of memory", errmsg);
goto err;
}
- connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
+ query_str.append(STRING_WITH_LEN("'"), system_charset_info);
- rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
+ rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
if (rc)
{
err_code= mysql_errno(mysql);
@@ -1893,8 +1896,78 @@ after_set_capability:
goto err;
}
}
+
+ query_str.length(0);
+ if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_strict_mode="),
+ system_charset_info) ||
+ query_str.append_ulonglong(opt_gtid_strict_mode != false))
+ {
+ err_code= ER_OUTOFMEMORY;
+ errmsg= "The slave I/O thread stops because a fatal out-of-memory "
+ "error is encountered when it tries to set @slave_gtid_strict_mode.";
+ sprintf(err_buff, "%s Error: Out of memory", errmsg);
+ goto err;
+ }
+
+ rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
+ if (rc)
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting @slave_gtid_strict_mode failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to set @slave_gtid_strict_mode.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+
+ if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID)
+ {
+ query_str.length(0);
+ query_str.append(STRING_WITH_LEN("SET @slave_until_gtid='"),
+ system_charset_info);
+ if (mi->rli.until_gtid_pos.append_to_string(&query_str))
+ {
+ err_code= ER_OUTOFMEMORY;
+ errmsg= "The slave I/O thread stops because a fatal out-of-memory "
+ "error is encountered when it tries to compute @slave_until_gtid.";
+ sprintf(err_buff, "%s Error: Out of memory", errmsg);
+ goto err;
+ }
+ query_str.append(STRING_WITH_LEN("'"), system_charset_info);
+
+ rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
+ if (rc)
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting @slave_until_gtid failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to set @slave_until_gtid.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+ }
}
- if (!mi->using_gtid)
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
{
/*
If we are not using GTID to connect this time, then instead request
@@ -1920,7 +1993,7 @@ after_set_capability:
(master_row[0] != NULL))
{
rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0],
- strlen(master_row[0]), false);
+ strlen(master_row[0]), false, false);
}
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
goto slave_killed_err;
@@ -2288,8 +2361,8 @@ static bool send_show_master_info_header(THD *thd, bool full,
sizeof(mi->ssl_crl)));
field_list.push_back(new Item_empty_string("Master_SSL_Crlpath",
sizeof(mi->ssl_crlpath)));
- field_list.push_back(new Item_return_int("Using_Gtid", sizeof(ulong),
- MYSQL_TYPE_LONG));
+ field_list.push_back(new Item_empty_string("Using_Gtid",
+ sizeof("Current_Pos")-1));
if (full)
{
field_list.push_back(new Item_return_int("Retried_transactions",
@@ -2302,7 +2375,8 @@ static bool send_show_master_info_header(THD *thd, bool full,
10, MYSQL_TYPE_LONG));
field_list.push_back(new Item_float("Slave_heartbeat_period",
0.0, 3, 10));
- field_list.push_back(new Item_empty_string("Gtid_Pos", gtid_pos_length));
+ field_list.push_back(new Item_empty_string("Gtid_Slave_Pos",
+ gtid_pos_length));
}
if (protocol->send_result_set_metadata(&field_list,
@@ -2382,7 +2456,8 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
- "Relay"), &my_charset_bin);
+ ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay":
+ "Gtid")), &my_charset_bin);
protocol->store(mi->rli.until_log_name, &my_charset_bin);
protocol->store((ulonglong) mi->rli.until_log_pos);
@@ -2473,7 +2548,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
protocol->store(mi->ssl_capath, &my_charset_bin);
- protocol->store((uint32) (mi->using_gtid != 0));
+ protocol->store((mi->using_gtid==Master_info::USE_GTID_NO ? "No" :
+ (mi->using_gtid==Master_info::USE_GTID_SLAVE_POS ?
+ "Slave_Pos" : "Current_Pos")),
+ &my_charset_bin);
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3079,7 +3157,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
This tests if the position of the beginning of the current event
hits the UNTIL barrier.
*/
- if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
+ if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
rli->is_until_satisfied(thd, ev))
{
char buf[22];
@@ -3976,7 +4055,8 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
saved_master_log_pos= rli->group_master_log_pos;
saved_skip= rli->slave_skip_counter;
}
- if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
+ if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
rli->is_until_satisfied(thd, NULL))
{
char buf[22];
@@ -4815,7 +4895,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
break;
+ case GTID_LIST_EVENT:
+ {
+ const char *errmsg;
+ Gtid_list_log_event *glev;
+ Log_event *tmp;
+
+ if (mi->rli.until_condition != Relay_log_info::UNTIL_GTID)
+ goto default_action;
+ if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg,
+ mi->rli.relay_log.description_event_for_queue,
+ opt_slave_sql_verify_checksum)))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ glev= static_cast<Gtid_list_log_event *>(tmp);
+ if (glev->gl_flags & Gtid_list_log_event::FLAG_UNTIL_REACHED)
+ {
+ char str_buf[128];
+ String str(str_buf, sizeof(str_buf), system_charset_info);
+ mi->rli.until_gtid_pos.to_string(&str);
+ sql_print_information("Slave IO thread stops because it reached its"
+ " UNTIL master_gtid_pos %s", str.c_ptr_safe());
+ mi->abort_slave= true;
+ }
+ delete glev;
+
+ /*
+ Do not update position for fake Gtid_list event (which has a zero
+ end_log_pos).
+ */
+ inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
+ }
+ break;
+
default:
+ default_action:
inc_pos= event_len;
break;
}
diff --git a/sql/sql_join_cache.cc b/sql/sql_join_cache.cc
index 6b0882bda80..9fca8730cb5 100644
--- a/sql/sql_join_cache.cc
+++ b/sql/sql_join_cache.cc
@@ -3812,8 +3812,8 @@ uint JOIN_TAB_SCAN_MRR::aux_buffer_incr(ulong recno)
uint incr= 0;
TABLE_REF *ref= &join_tab->ref;
TABLE *tab= join_tab->table;
- uint rec_per_key=
- tab->key_info[ref->key].actual_rec_per_key(ref->key_parts-1);
+ ha_rows rec_per_key=
+ (ha_rows) tab->key_info[ref->key].actual_rec_per_key(ref->key_parts-1);
set_if_bigger(rec_per_key, 1);
if (recno == 1)
incr= ref->key_length + tab->file->ref_length;
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 50afc657ee6..9cdc8d7d0b6 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -210,6 +210,8 @@ struct LEX_MASTER_INFO
char *ssl_crl, *ssl_crlpath;
char *relay_log_name;
LEX_STRING connection_name;
+ /* Value in START SLAVE UNTIL master_gtid_pos=xxx */
+ LEX_STRING gtid_pos_str;
ulonglong pos;
ulong relay_log_pos;
ulong server_id;
@@ -220,8 +222,10 @@ struct LEX_MASTER_INFO
changed variable or if it should be left at old value
*/
enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
- ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt,
- use_gtid_opt;
+ ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt;
+ enum {
+ LEX_GTID_UNCHANGED, LEX_GTID_NO, LEX_GTID_CURRENT_POS, LEX_GTID_SLAVE_POS
+ } use_gtid_opt;
void init()
{
@@ -237,7 +241,10 @@ struct LEX_MASTER_INFO
pos= relay_log_pos= server_id= port= connect_retry= 0;
heartbeat_period= 0;
ssl= ssl_verify_server_cert= heartbeat_opt=
- repl_ignore_server_ids_opt= use_gtid_opt= LEX_MI_UNCHANGED;
+ repl_ignore_server_ids_opt= LEX_MI_UNCHANGED;
+ gtid_pos_str.length= 0;
+ gtid_pos_str.str= NULL;
+ use_gtid_opt= LEX_GTID_UNCHANGED;
}
};
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 100d3c9fe85..9e3da1ce641 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -30,6 +30,14 @@
#include "rpl_handler.h"
#include "debug_sync.h"
+
+enum enum_gtid_until_state {
+ GTID_UNTIL_NOT_DONE,
+ GTID_UNTIL_STOP_AFTER_STANDALONE,
+ GTID_UNTIL_STOP_AFTER_TRANSACTION
+};
+
+
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
#ifndef DBUG_OFF
@@ -38,6 +46,74 @@ static int binlog_dump_count = 0;
extern TYPELIB binlog_checksum_typelib;
+
+static int
+fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
+ my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
+ uint8 checksum_alg_arg)
+{
+ char header[LOG_EVENT_HEADER_LEN];
+ ulong event_len;
+
+ *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
+ checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
+
+ /*
+ 'when' (the timestamp) is set to 0 so that slave could distinguish between
+ real and fake Rotate events (if necessary)
+ */
+ memset(header, 0, 4);
+ header[EVENT_TYPE_OFFSET] = (uchar)event_type;
+ event_len= LOG_EVENT_HEADER_LEN + extra_len +
+ (*do_checksum ? BINLOG_CHECKSUM_LEN : 0);
+ int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
+ // TODO: check what problems this may cause and fix them
+ int4store(header + LOG_POS_OFFSET, 0);
+ if (packet->append(header, sizeof(header)))
+ {
+ *errmsg= "Failed due to out-of-memory writing event";
+ return -1;
+ }
+ if (*do_checksum)
+ {
+ *crc= my_checksum(0L, NULL, 0);
+ *crc= my_checksum(*crc, (uchar*)header, sizeof(header));
+ }
+ return 0;
+}
+
+
+static int
+fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg)
+{
+ if (do_checksum)
+ {
+ char b[BINLOG_CHECKSUM_LEN];
+ int4store(b, crc);
+ if (packet->append(b, sizeof(b)))
+ {
+ *errmsg= "Failed due to out-of-memory writing event checksum";
+ return -1;
+ }
+ }
+ return 0;
+}
+
+
+static int
+fake_event_write(NET *net, String *packet, const char **errmsg)
+{
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ {
+ *errmsg = "failed on my_net_write()";
+ return -1;
+ }
+ return 0;
+}
+
+
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
@@ -61,59 +137,71 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
- char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
-
- /*
- this Rotate is to be sent with checksum if and only if
- slave's get_master_version_and_clock time handshake value
- of master's @@global.binlog_checksum was TRUE
- */
-
- my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
- checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
-
- /*
- 'when' (the timestamp) is set to 0 so that slave could distinguish between
- real and fake Rotate events (if necessary)
- */
- memset(header, 0, 4);
- header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
-
+ char buf[ROTATE_HEADER_LEN+100];
+ my_bool do_checksum;
+ int err;
char* p = log_file_name+dirname_length(log_file_name);
uint ident_len = (uint) strlen(p);
- ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
- (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
- int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
- int4store(header + EVENT_LEN_OFFSET, event_len);
- int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
+ ha_checksum crc;
- // TODO: check what problems this may cause and fix them
- int4store(header + LOG_POS_OFFSET, 0);
+ if ((err= fake_event_header(packet, ROTATE_EVENT,
+ ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc,
+ errmsg, checksum_alg_arg)))
+ DBUG_RETURN(err);
- packet->append(header, sizeof(header));
int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN);
packet->append(p, ident_len);
if (do_checksum)
{
- char b[BINLOG_CHECKSUM_LEN];
- ha_checksum crc= my_checksum(0L, NULL, 0);
- crc= my_checksum(crc, (uchar*)header, sizeof(header));
crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
crc= my_checksum(crc, (uchar*)p, ident_len);
- int4store(b, crc);
- packet->append(b, sizeof(b));
}
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
+ (err= fake_event_write(net, packet, errmsg)))
+ DBUG_RETURN(err);
+
+ DBUG_RETURN(0);
+}
+
+
+static int fake_gtid_list_event(NET* net, String* packet,
+ Gtid_list_log_event *glev, const char** errmsg,
+ uint8 checksum_alg_arg)
+{
+ my_bool do_checksum;
+ int err;
+ ha_checksum crc;
+ char buf[128];
+ String str(buf, sizeof(buf), system_charset_info);
+
+ str.length(0);
+ if (glev->to_packet(&str))
{
- *errmsg = "failed on my_net_write()";
- DBUG_RETURN(-1);
+ *errmsg= "Failed due to out-of-memory writing Gtid_list event";
+ return -1;
}
- DBUG_RETURN(0);
+ if ((err= fake_event_header(packet, GTID_LIST_EVENT,
+ str.length(), &do_checksum, &crc,
+ errmsg, checksum_alg_arg)))
+ return err;
+
+ packet->append(str);
+ if (do_checksum)
+ {
+ crc= my_checksum(crc, (uchar*)str.ptr(), str.length());
+ }
+
+ if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
+ (err= fake_event_write(net, packet, errmsg)))
+ return err;
+
+ return 0;
}
+
/*
Reset thread transmit packet buffer for event sending
@@ -526,6 +614,40 @@ get_slave_connect_state(THD *thd, String *out_str)
}
+static bool
+get_slave_gtid_strict_mode(THD *thd)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_strict_mode") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_int(&null_value) && !null_value;
+}
+
+
+/*
+ Get the value of the @slave_until_gtid user variable into the supplied
+ String (this is the GTID position specified for START SLAVE UNTIL
+ master_gtid_pos='xxx').
+
+ Returns false if error (ie. slave did not set the variable and is not doing
+ START SLAVE UNTIL mater_gtid_pos='xxx'), true if success.
+*/
+static bool
+get_slave_until_gtid(THD *thd, String *out_str)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_until_gtid") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
+}
+
+
/*
Function prepares and sends repliation heartbeat event.
@@ -767,16 +889,14 @@ contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
Give an error if the slave requests something that we do not have in our
binlog.
-
- T
*/
static int
check_slave_start_position(THD *thd, slave_connection_state *st,
- const char **errormsg, rpl_gtid *error_gtid)
+ const char **errormsg, rpl_gtid *error_gtid,
+ slave_connection_state *until_gtid_state)
{
uint32 i;
- bool found;
int err;
rpl_gtid **delete_list= NULL;
uint32 delete_idx= 0;
@@ -791,9 +911,9 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
rpl_gtid master_replication_gtid;
rpl_gtid start_gtid;
- if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
- slave_gtid->server_id,
- &master_gtid)) &&
+ if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
+ slave_gtid->server_id,
+ &master_gtid) &&
master_gtid.seq_no >= slave_gtid->seq_no)
continue;
@@ -814,6 +934,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
slave_gtid->seq_no != master_replication_gtid.seq_no)
{
rpl_gtid domain_gtid;
+ rpl_gtid *until_gtid;
if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
&domain_gtid))
@@ -832,6 +953,27 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
++missing_domains;
continue;
}
+
+ if (until_gtid_state &&
+ ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
+ (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
+ until_gtid->server_id,
+ &master_gtid) &&
+ master_gtid.seq_no >= until_gtid->seq_no)))
+ {
+ /*
+ The slave requested to start from a position that is not (yet) in
+ our binlog, but it also specified an UNTIL condition that _is_ in
+ our binlog (or a missing UNTIL, which means stop at the very
+ beginning). So the stop position is before the start position, and
+ we just delete the entry from the UNTIL hash to mark that this
+ domain has already reached the UNTIL condition.
+ */
+ if(until_gtid)
+ until_gtid_state->remove(until_gtid);
+ continue;
+ }
+
*errormsg= "Requested slave GTID state not found in binlog";
*error_gtid= *slave_gtid;
err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
@@ -951,7 +1093,8 @@ end:
the requested GTID that was already purged.
*/
static const char *
-gtid_find_binlog_file(slave_connection_state *state, char *out_name)
+gtid_find_binlog_file(slave_connection_state *state, char *out_name,
+ slave_connection_state *until_gtid_state)
{
MEM_ROOT memroot;
binlog_file_entry *list;
@@ -1003,42 +1146,60 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name)
if (!glev || contains_all_slave_gtid(state, glev))
{
- uint32 i;
-
strmake(out_name, buf, FN_REFLEN);
- /*
- As a special case, we allow to start from binlog file N if the
- requested GTID is the last event (in the corresponding domain) in
- binlog file (N-1), but then we need to remove that GTID from the slave
- state, rather than skipping events waiting for it to turn up.
- */
- for (i= 0; i < glev->count; ++i)
+ if (glev)
{
- const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
- if (!gtid)
- {
- /*
- contains_all_slave_gtid() returns false if there is any domain in
- Gtid_list_event which is not in the requested slave position.
+ uint32 i;
- We may delete a domain from the slave state inside this loop, but
- we only do this when it is the very last GTID logged for that
- domain in earlier binlogs, and then we can not encounter it in any
- further GTIDs in the Gtid_list.
- */
- DBUG_ASSERT(0);
- continue;
- }
- if (gtid->server_id == glev->list[i].server_id &&
- gtid->seq_no == glev->list[i].seq_no)
+ /*
+ As a special case, we allow to start from binlog file N if the
+ requested GTID is the last event (in the corresponding domain) in
+ binlog file (N-1), but then we need to remove that GTID from the slave
+ state, rather than skipping events waiting for it to turn up.
+
+ If slave is doing START SLAVE UNTIL, check for any UNTIL conditions
+ that are already included in a previous binlog file. Delete any such
+ from the UNTIL hash, to mark that such domains have already reached
+ their UNTIL condition.
+ */
+ for (i= 0; i < glev->count; ++i)
{
- /*
- The slave requested to start from the very beginning of this
- domain in this binlog file. So delete the entry from the state,
- we do not need to skip anything.
- */
- state->remove(gtid);
+ const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
+ if (!gtid)
+ {
+ /*
+ Contains_all_slave_gtid() returns false if there is any domain in
+ Gtid_list_event which is not in the requested slave position.
+
+ We may delete a domain from the slave state inside this loop, but
+ we only do this when it is the very last GTID logged for that
+ domain in earlier binlogs, and then we can not encounter it in any
+ further GTIDs in the Gtid_list.
+ */
+ DBUG_ASSERT(0);
+ } else if (gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no == glev->list[i].seq_no)
+ {
+ /*
+ The slave requested to start from the very beginning of this
+ domain in this binlog file. So delete the entry from the state,
+ we do not need to skip anything.
+ */
+ state->remove(gtid);
+ }
+
+ if (until_gtid_state &&
+ (gtid= until_gtid_state->find(glev->list[i].domain_id)) &&
+ gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no <= glev->list[i].seq_no)
+ {
+ /*
+ We've already reached the stop position in UNTIL for this domain,
+ since it is before the start position.
+ */
+ until_gtid_state->remove(gtid);
+ }
}
}
@@ -1163,6 +1324,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
goto end;
}
status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
+ current_checksum_alg,
&gtid_list, &list_len);
if (status)
{
@@ -1256,6 +1418,49 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
}
+static bool
+is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
+ enum_gtid_until_state gtid_until_group,
+ Log_event_type event_type, uint8 current_checksum_alg,
+ ushort flags, const char **errmsg,
+ rpl_binlog_state *until_binlog_state)
+{
+ switch (gtid_until_group)
+ {
+ case GTID_UNTIL_NOT_DONE:
+ return false;
+ case GTID_UNTIL_STOP_AFTER_STANDALONE:
+ if (Log_event::is_part_of_group(event_type))
+ return false;
+ break;
+ case GTID_UNTIL_STOP_AFTER_TRANSACTION:
+ if (event_type != XID_EVENT &&
+ (event_type != QUERY_EVENT ||
+ !Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset,
+ packet->length()-*ev_offset,
+ current_checksum_alg)))
+ return false;
+ break;
+ }
+
+ /*
+ The last event group has been sent, now the START SLAVE UNTIL condition
+ has been reached.
+
+ Send a last fake Gtid_list_log_event with a flag set to mark that we
+ stop due to UNTIL condition.
+ */
+ if (reset_transmit_packet(thd, flags, ev_offset, errmsg))
+ return true;
+ Gtid_list_log_event glev(until_binlog_state,
+ Gtid_list_log_event::FLAG_UNTIL_REACHED);
+ if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg))
+ return true;
+ *errmsg= NULL;
+ return true;
+}
+
+
/*
Helper function for mysql_binlog_send() to write an event down the slave
connection.
@@ -1268,37 +1473,142 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
IO_CACHE *log, int mariadb_slave_capability,
ulong ev_offset, uint8 current_checksum_alg,
bool using_gtid_state, slave_connection_state *gtid_state,
- enum_gtid_skip_type *gtid_skip_group)
+ enum_gtid_skip_type *gtid_skip_group,
+ slave_connection_state *until_gtid_state,
+ enum_gtid_until_state *gtid_until_group,
+ rpl_binlog_state *until_binlog_state,
+ bool slave_gtid_strict_mode, rpl_gtid *error_gtid)
{
my_off_t pos;
size_t len= packet->length();
+ if (event_type == GTID_LIST_EVENT && using_gtid_state && until_gtid_state)
+ {
+ rpl_gtid *gtid_list;
+ uint32 list_len;
+ bool err;
+
+ if (ev_offset > len ||
+ Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+ current_checksum_alg,
+ &gtid_list, &list_len))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to read Gtid_list_log_event: corrupt binlog";
+ }
+ err= until_binlog_state->load(gtid_list, list_len);
+ my_free(gtid_list);
+ if (err)
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed in internal GTID book-keeping: Out of memory";
+ }
+ }
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
- if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0)
+ if (event_type == GTID_EVENT && using_gtid_state)
{
- uint32 server_id, domain_id;
- uint64 seq_no;
uchar flags2;
rpl_gtid *gtid;
- if (ev_offset > len ||
- Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
- current_checksum_alg,
- &domain_id, &server_id, &seq_no, &flags2))
- return "Failed to read Gtid_log_event: corrupt binlog";
- gtid= gtid_state->find(domain_id);
- if (gtid != NULL)
+ if (gtid_state->count() > 0 || until_gtid_state)
{
- /* Skip this event group if we have not yet reached slave start pos. */
- if (server_id != gtid->server_id || seq_no <= gtid->seq_no)
- *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
- GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
- /*
- Delete this entry if we have reached slave start position (so we will
- not skip subsequent events and won't have to look them up and check).
- */
- if (server_id == gtid->server_id && seq_no >= gtid->seq_no)
- gtid_state->remove(gtid);
+ rpl_gtid event_gtid;
+
+ if (ev_offset > len ||
+ Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+ current_checksum_alg,
+ &event_gtid.domain_id, &event_gtid.server_id,
+ &event_gtid.seq_no, &flags2))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to read Gtid_log_event: corrupt binlog";
+ }
+
+ if (until_binlog_state->update(&event_gtid, false))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed in internal GTID book-keeping: Out of memory";
+ }
+
+ if (gtid_state->count() > 0)
+ {
+ gtid= gtid_state->find(event_gtid.domain_id);
+ if (gtid != NULL)
+ {
+ /* Skip this event group if we have not yet reached slave start pos. */
+ if (event_gtid.server_id != gtid->server_id ||
+ event_gtid.seq_no <= gtid->seq_no)
+ *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ if (event_gtid.server_id == gtid->server_id &&
+ event_gtid.seq_no >= gtid->seq_no)
+ {
+ /*
+ In strict mode, it is an error if the slave requests to start in
+ a "hole" in the master's binlog: a GTID that does not exist, even
+ though both the prior and subsequent seq_no exists for same
+ domain_id and server_id.
+ */
+ if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no)
+ {
+ my_errno= ER_GTID_START_FROM_BINLOG_HOLE;
+ *error_gtid= *gtid;
+ return "The binlog on the master is missing the GTID requested "
+ "by the slave (even though both a prior and a subsequent "
+ "sequence number does exist), and GTID strict mode is enabled.";
+ }
+ /*
+ Delete this entry if we have reached slave start position (so we
+ will not skip subsequent events and won't have to look them up
+ and check).
+ */
+ gtid_state->remove(gtid);
+ }
+ }
+ }
+
+ if (until_gtid_state)
+ {
+ gtid= until_gtid_state->find(event_gtid.domain_id);
+ if (gtid == NULL)
+ {
+ /*
+ This domain already reached the START SLAVE UNTIL stop condition,
+ so skip this event group.
+ */
+ *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ else if (event_gtid.server_id == gtid->server_id &&
+ event_gtid.seq_no >= gtid->seq_no)
+ {
+ /*
+ We have reached the stop condition.
+ Delete this domain_id from the hash, so we will skip all further
+ events in this domain and eventually stop when all domains are
+ done.
+ */
+ uint64 until_seq_no= gtid->seq_no;
+ until_gtid_state->remove(gtid);
+ if (until_gtid_state->count() == 0)
+ *gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_UNTIL_STOP_AFTER_STANDALONE :
+ GTID_UNTIL_STOP_AFTER_TRANSACTION);
+ if (event_gtid.seq_no > until_seq_no)
+ {
+ /*
+ The GTID in START SLAVE UNTIL condition is missing in our binlog.
+ This should normally not happen (user error), but since we can be
+ sure that we are now beyond the position that the UNTIL condition
+ should be in, we can just stop now. And we also need to skip this
+ event group (as it is beyond the UNTIL condition).
+ */
+ *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ }
+ }
}
}
@@ -1356,7 +1666,10 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
a no-operation on the slave.
*/
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace row annotate event with dummy: too small event.";
+ }
}
}
@@ -1375,8 +1688,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
ev_offset,
current_checksum_alg);
if (err)
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace GTID event with backwards-compatible event: "
"currupt event.";
+ }
if (!need_dummy)
return NULL;
}
@@ -1403,8 +1719,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
binlog positions.
*/
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace binlog checkpoint or gtid list event with "
"dummy: too small event.";
+ }
}
}
@@ -1428,24 +1747,37 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
(thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
+ }
if (my_net_write(net, (uchar*) packet->ptr(), len))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
return "Failed on my_net_write()";
+ }
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
if (event_type == LOAD_EVENT)
{
if (send_file(thd))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
return "failed in send_file()";
+ }
}
if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
+ }
return NULL; /* Success */
}
+
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
ushort flags)
{
@@ -1465,12 +1797,18 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
int mariadb_slave_capability;
- char str_buf[256];
+ char str_buf[128];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
bool using_gtid_state;
- slave_connection_state gtid_state, return_gtid_state;
+ char str_buf2[128];
+ String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
+ slave_connection_state gtid_state, until_gtid_state_obj;
+ slave_connection_state *until_gtid_state= NULL;
rpl_gtid error_gtid;
enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
+ enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE;
+ rpl_binlog_state until_binlog_state;
+ bool slave_gtid_strict_mode;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
@@ -1502,6 +1840,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
connect_gtid_state.length(0);
using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
+ if (using_gtid_state)
+ {
+ slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+ if(get_slave_until_gtid(thd, &slave_until_gtid_str))
+ until_gtid_state= &until_gtid_state_obj;
+ }
+
/*
We want to corrupt the first event, in Log_event::read_log_event().
But we do not want the corruption to happen early, eg. when client does
@@ -1557,13 +1902,23 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
+ if (until_gtid_state &&
+ until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
+ slave_until_gtid_str.length()))
+ {
+ errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
+ "position sent from slave";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
- &error_gtid)))
+ &error_gtid, until_gtid_state)))
{
my_errno= error;
goto err;
}
- if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name)))
+ if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name,
+ until_gtid_state)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -1753,6 +2108,15 @@ impossible position";
/* The Format_description_log_event event will be found naturally. */
}
+ /*
+ Handle the case of START SLAVE UNTIL with an UNTIL condition already
+ fulfilled at the start position.
+
+ We will send one event, the format_description, and then stop.
+ */
+ if (until_gtid_state && until_gtid_state->count() == 0)
+ gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+
/* seek to the requested position, to start the requested dump */
my_b_seek(&log, pos); // Seek will done on next read
@@ -1833,12 +2197,26 @@ impossible position";
log_file_name, &log,
mariadb_slave_capability, ev_offset,
current_checksum_alg, using_gtid_state,
- &gtid_state, &gtid_skip_group)))
+ &gtid_state, &gtid_skip_group,
+ until_gtid_state, &gtid_until_group,
+ &until_binlog_state,
+ slave_gtid_strict_mode, &error_gtid)))
{
errmsg= tmp_msg;
- my_errno= ER_UNKNOWN_ERROR;
goto err;
}
+ if (until_gtid_state &&
+ is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
+ event_type, current_checksum_alg, flags, &errmsg,
+ &until_binlog_state))
+ {
+ if (errmsg)
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ goto end;
+ }
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
@@ -1950,6 +2328,8 @@ impossible position";
thd->ENTER_COND(log_cond, log_lock,
&stage_master_has_sent_all_binlog_to_slave,
&old_stage);
+ if (thd->killed)
+ break;
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
if (ret == ETIMEDOUT || ret == ETIME)
@@ -1981,7 +2361,7 @@ impossible position";
{
DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
}
- } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
+ } while (signal_cnt == mysql_bin_log.signal_cnt);
thd->EXIT_COND(&old_stage);
}
break;
@@ -1992,18 +2372,34 @@ impossible position";
goto err;
}
- if (read_packet &&
- (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log,
- mariadb_slave_capability, ev_offset,
- current_checksum_alg,
- using_gtid_state, &gtid_state,
- &gtid_skip_group)))
+ if (read_packet)
{
- errmsg= tmp_msg;
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
+ log_file_name, &log,
+ mariadb_slave_capability, ev_offset,
+ current_checksum_alg,
+ using_gtid_state, &gtid_state,
+ &gtid_skip_group, until_gtid_state,
+ &gtid_until_group, &until_binlog_state,
+ slave_gtid_strict_mode, &error_gtid)))
+ {
+ errmsg= tmp_msg;
+ goto err;
+ }
+ if (
+ until_gtid_state &&
+ is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
+ event_type, current_checksum_alg, flags, &errmsg,
+ &until_binlog_state))
+ {
+ if (errmsg)
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ goto end;
+ }
+ }
log.error=0;
}
@@ -2099,6 +2495,17 @@ err:
/* Use this error code so slave will know not to try reconnect. */
my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
}
+ else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "The binlog on the master is missing the GTID %u-%u-%llu "
+ "requested by the slave (even though both a prior and a "
+ "subsequent sequence number does exist), and GTID strict mode "
+ "is enabled",
+ error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ /* Use this error code so slave will know not to try reconnect. */
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
{
my_snprintf(error_text, sizeof(error_text),
@@ -2167,6 +2574,26 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
lock_slave_threads(mi); // this allows us to cleanly read slave_running
// Get a mask of _stopped_ threads
init_thread_mask(&thread_mask,mi,1 /* inverse */);
+
+ if (thd->lex->mi.gtid_pos_str.str)
+ {
+ if (thread_mask != (SLAVE_IO|SLAVE_SQL))
+ {
+ slave_errno= ER_SLAVE_WAS_RUNNING;
+ goto err;
+ }
+ if (thd->lex->slave_thd_opt)
+ {
+ slave_errno= ER_BAD_SLAVE_UNTIL_COND;
+ goto err;
+ }
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ {
+ slave_errno= ER_UNTIL_REQUIRES_USING_GTID;
+ goto err;
+ }
+ }
+
/*
Below we will start all stopped threads. But if the user wants to
start only one thread, do as if the other thread was running (as we
@@ -2213,10 +2640,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
sizeof(mi->rli.until_log_name)-1);
}
+ else if (thd->lex->mi.gtid_pos_str.str)
+ {
+ if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str,
+ thd->lex->mi.gtid_pos_str.length))
+ {
+ slave_errno= ER_INCORRECT_GTID_STATE;
+ mysql_mutex_unlock(&mi->rli.data_lock);
+ goto err;
+ }
+ mi->rli.until_condition= Relay_log_info::UNTIL_GTID;
+ }
else
mi->rli.clear_until_condition();
- if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
+ if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS)
{
/* Preparing members for effective until condition checking */
const char *p= fn_ext(mi->rli.until_log_name);
@@ -2239,7 +2678,10 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
/* mark the cached result of the UNTIL comparison as "undefined" */
mi->rli.until_log_names_cmp_result=
Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
+ }
+ if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
+ {
/* Issuing warning then started without --skip-slave-start */
if (!opt_skip_slave_start)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
@@ -2271,6 +2713,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
ER(ER_SLAVE_WAS_RUNNING));
}
+err:
unlock_slave_threads(mi);
if (slave_errno)
@@ -2749,12 +3192,14 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
}
- if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
- mi->using_gtid= true;
- else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_DISABLE ||
+ if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_SLAVE_POS)
+ mi->using_gtid= Master_info::USE_GTID_SLAVE_POS;
+ else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_CURRENT_POS)
+ mi->using_gtid= Master_info::USE_GTID_CURRENT_POS;
+ else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_NO ||
lex_mi->log_file_name || lex_mi->pos ||
lex_mi->relay_log_name || lex_mi->relay_log_pos)
- mi->using_gtid= false;
+ mi->using_gtid= Master_info::USE_GTID_NO;
/*
If user did specify neither host nor port nor any log name nor any log
@@ -2810,7 +3255,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
goto err;
}
- if (mi->using_gtid)
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
{
/*
Clear the position in the master binlogs, so that we request the
@@ -3293,7 +3738,7 @@ int log_loaded_block(IO_CACHE* file)
/**
- Initialise the slave replication state from the mysql.rpl_slave_state table.
+ Initialise the slave replication state from the mysql.gtid_slave_pos table.
This is called each time an SQL thread starts, but the data is only actually
loaded on the first call.
@@ -3305,7 +3750,7 @@ int log_loaded_block(IO_CACHE* file)
The one containing the current slave state is the one with the maximal
sub_id value, within each domain_id.
- CREATE TABLE mysql.rpl_slave_state (
+ CREATE TABLE mysql.gtid_slave_pos (
domain_id INT UNSIGNED NOT NULL,
sub_id BIGINT UNSIGNED NOT NULL,
server_id INT UNSIGNED NOT NULL,
@@ -3341,7 +3786,7 @@ rpl_append_gtid_state(String *dest, bool use_binlog)
rpl_gtid *gtid_list= NULL;
uint32 num_gtids= 0;
- if (opt_bin_log &&
+ if (use_binlog && opt_bin_log &&
(err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
return err;
@@ -3353,9 +3798,10 @@ rpl_append_gtid_state(String *dest, bool use_binlog)
bool
-rpl_gtid_pos_check(char *str, size_t len)
+rpl_gtid_pos_check(THD *thd, char *str, size_t len)
{
slave_connection_state tmp_slave_state;
+ bool gave_conflict_warning= false, gave_missing_warning= false;
/* Check that we can parse the supplied string. */
if (tmp_slave_state.load(str, len))
@@ -3390,18 +3836,43 @@ rpl_gtid_pos_check(char *str, size_t len)
continue;
if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id)))
{
- my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0),
- binlog_gtid->domain_id, binlog_gtid->domain_id,
- binlog_gtid->server_id, binlog_gtid->seq_no);
- break;
+ if (opt_gtid_strict_mode)
+ {
+ my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0),
+ binlog_gtid->domain_id, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ break;
+ }
+ else if (!gave_missing_warning)
+ {
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_MASTER_GTID_POS_MISSING_DOMAIN,
+ ER(ER_MASTER_GTID_POS_MISSING_DOMAIN),
+ binlog_gtid->domain_id, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ gave_missing_warning= true;
+ }
}
- if (slave_gtid->seq_no < binlog_gtid->seq_no)
+ else if (slave_gtid->seq_no < binlog_gtid->seq_no)
{
- my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0),
- slave_gtid->domain_id, slave_gtid->server_id,
- slave_gtid->seq_no, binlog_gtid->domain_id,
- binlog_gtid->server_id, binlog_gtid->seq_no);
- break;
+ if (opt_gtid_strict_mode)
+ {
+ my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0),
+ slave_gtid->domain_id, slave_gtid->server_id,
+ slave_gtid->seq_no, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ break;
+ }
+ else if (!gave_conflict_warning)
+ {
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG,
+ ER(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG),
+ slave_gtid->domain_id, slave_gtid->server_id,
+ slave_gtid->seq_no, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ gave_conflict_warning= true;
+ }
}
}
my_free(binlog_gtid_list);
@@ -3416,7 +3887,7 @@ rpl_gtid_pos_check(char *str, size_t len)
bool
rpl_gtid_pos_update(THD *thd, char *str, size_t len)
{
- if (rpl_global_gtid_slave_state.load(thd, str, len, true))
+ if (rpl_global_gtid_slave_state.load(thd, str, len, true, true))
{
my_error(ER_FAILED_GTID_STATE_INIT, MYF(0));
return true;
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 3af8f721bd7..820ffed0928 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -70,7 +70,7 @@ void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
-bool rpl_gtid_pos_check(char *str, size_t len);
+bool rpl_gtid_pos_check(THD *thd, char *str, size_t len);
bool rpl_gtid_pos_update(THD *thd, char *str, size_t len);
#endif /* HAVE_REPLICATION */
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index 48fb926bfcc..ba36c0fc63f 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -5977,8 +5977,8 @@ static int get_schema_stat_record(THD *thd, TABLE_LIST *tables,
KEY *key=show_table->key_info+i;
if (key->rec_per_key[j])
{
- ha_rows records=((double) show_table->stat_records() /
- key->actual_rec_per_key(j));
+ ha_rows records= (ha_rows) ((double) show_table->stat_records() /
+ key->actual_rec_per_key(j));
table->field[9]->store((longlong) records, TRUE);
table->field[9]->set_notnull();
}
diff --git a/sql/sql_statistics.cc b/sql/sql_statistics.cc
index f355f2c7760..2e2886a1d3f 100644
--- a/sql/sql_statistics.cc
+++ b/sql/sql_statistics.cc
@@ -337,7 +337,7 @@ protected:
void store_record_for_lookup()
{
- store_record(stat_table, record[0]);
+ DBUG_ASSERT(record[0] == stat_table->record[0]);
}
bool update_record()
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 447ccf5f469..6be7c23fb0d 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -1026,6 +1026,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token CUBE_SYM /* SQL-2003-R */
%token CURDATE /* MYSQL-FUNC */
%token CURRENT_USER /* SQL-2003-R */
+%token CURRENT_POS_SYM
%token CURSOR_SYM /* SQL-2003-R */
%token CURSOR_NAME_SYM /* SQL-2003-N */
%token CURTIME /* MYSQL-FUNC */
@@ -1205,7 +1206,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token LOW_PRIORITY
%token LT /* OPERATOR */
%token MASTER_CONNECT_RETRY_SYM
-%token MASTER_USE_GTID_SYM
+%token MASTER_GTID_POS_SYM
%token MASTER_HOST_SYM
%token MASTER_LOG_FILE_SYM
%token MASTER_LOG_POS_SYM
@@ -1223,6 +1224,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token MASTER_SSL_VERIFY_SERVER_CERT_SYM
%token MASTER_SYM
%token MASTER_USER_SYM
+%token MASTER_USE_GTID_SYM
%token MASTER_HEARTBEAT_PERIOD_SYM
%token MATCH /* SQL-2003-R */
%token MAX_CONNECTIONS_PER_HOUR
@@ -1406,6 +1408,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token SIMPLE_SYM /* SQL-2003-N */
%token SLAVE
%token SLAVES
+%token SLAVE_POS_SYM
%token SLOW
%token SMALLINT /* SQL-2003-R */
%token SNAPSHOT_SYM
@@ -2191,15 +2194,34 @@ master_file_def:
/* Adjust if < BIN_LOG_HEADER_SIZE (same comment as Lex->mi.pos) */
Lex->mi.relay_log_pos = max(BIN_LOG_HEADER_SIZE, Lex->mi.relay_log_pos);
}
- | MASTER_USE_GTID_SYM EQ ulong_num
+ | MASTER_USE_GTID_SYM EQ CURRENT_POS_SYM
{
- if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_GTID_UNCHANGED)
{
my_error(ER_DUP_ARGUMENT, MYF(0), "MASTER_use_gtid");
MYSQL_YYABORT;
}
- Lex->mi.use_gtid_opt= $3 ?
- LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE;
+ Lex->mi.use_gtid_opt= LEX_MASTER_INFO::LEX_GTID_CURRENT_POS;
+ }
+ ;
+ | MASTER_USE_GTID_SYM EQ SLAVE_POS_SYM
+ {
+ if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_GTID_UNCHANGED)
+ {
+ my_error(ER_DUP_ARGUMENT, MYF(0), "MASTER_use_gtid");
+ MYSQL_YYABORT;
+ }
+ Lex->mi.use_gtid_opt= LEX_MASTER_INFO::LEX_GTID_SLAVE_POS;
+ }
+ ;
+ | MASTER_USE_GTID_SYM EQ NO_SYM
+ {
+ if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_GTID_UNCHANGED)
+ {
+ my_error(ER_DUP_ARGUMENT, MYF(0), "MASTER_use_gtid");
+ MYSQL_YYABORT;
+ }
+ Lex->mi.use_gtid_opt= LEX_MASTER_INFO::LEX_GTID_NO;
}
;
@@ -7378,6 +7400,10 @@ slave_until:
MYSQL_YYABORT;
}
}
+ | UNTIL_SYM MASTER_GTID_POS_SYM EQ TEXT_STRING_sys
+ {
+ Lex->mi.gtid_pos_str = $4;
+ }
;
slave_until_opts:
@@ -13412,6 +13438,7 @@ keyword_sp:
| CONSTRAINT_NAME_SYM {}
| CONTEXT_SYM {}
| CONTRIBUTORS_SYM {}
+ | CURRENT_POS_SYM {}
| CPU_SYM {}
| CUBE_SYM {}
| CURSOR_NAME_SYM {}
@@ -13489,12 +13516,13 @@ keyword_sp:
| MAX_ROWS {}
| MASTER_SYM {}
| MASTER_HEARTBEAT_PERIOD_SYM {}
- | MASTER_USE_GTID_SYM {}
+ | MASTER_GTID_POS_SYM {}
| MASTER_HOST_SYM {}
| MASTER_PORT_SYM {}
| MASTER_LOG_FILE_SYM {}
| MASTER_LOG_POS_SYM {}
| MASTER_USER_SYM {}
+ | MASTER_USE_GTID_SYM {}
| MASTER_PASSWORD_SYM {}
| MASTER_SERVER_ID_SYM {}
| MASTER_CONNECT_RETRY_SYM {}
@@ -13599,6 +13627,7 @@ keyword_sp:
| SIMPLE_SYM {}
| SHARE_SYM {}
| SHUTDOWN {}
+ | SLAVE_POS_SYM {}
| SLOW {}
| SNAPSHOT_SYM {}
| SOFT_SYM {}
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 8c5a6c92a6b..3c78901addf 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1345,6 +1345,26 @@ static Sys_var_uint Sys_gtid_domain_id(
BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG,
ON_CHECK(check_has_super));
+
+static bool check_gtid_seq_no(sys_var *self, THD *thd, set_var *var)
+{
+ uint32 domain_id, server_id;
+ uint64_t seq_no;
+
+ if (check_has_super(self, thd, var))
+ return true;
+ domain_id= thd->variables.gtid_domain_id;
+ server_id= thd->variables.server_id;
+ seq_no= (uint64)var->value->val_uint();
+ DBUG_EXECUTE_IF("ignore_set_gtid_seq_no_check", return 0;);
+ if (opt_gtid_strict_mode && opt_bin_log &&
+ mysql_bin_log.check_strict_gtid_sequence(domain_id, server_id, seq_no))
+ return true;
+
+ return false;
+}
+
+
static Sys_var_ulonglong Sys_gtid_seq_no(
"gtid_seq_no",
"Internal server usage, for replication with global transaction id. "
@@ -1354,12 +1374,65 @@ static Sys_var_ulonglong Sys_gtid_seq_no(
SESSION_ONLY(gtid_seq_no),
NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0),
BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG,
- ON_CHECK(check_has_super));
+ ON_CHECK(check_gtid_seq_no));
#ifdef HAVE_REPLICATION
+static unsigned char opt_gtid_binlog_pos_dummy;
+static Sys_var_gtid_binlog_pos Sys_gtid_binlog_pos(
+ "gtid_binlog_pos", "Last GTID logged to the binary log, per replication"
+ "domain",
+ READ_ONLY GLOBAL_VAR(opt_gtid_binlog_pos_dummy), NO_CMD_LINE);
+
+
+uchar *
+Sys_var_gtid_binlog_pos::global_value_ptr(THD *thd, LEX_STRING *base)
+{
+ char buf[128];
+ String str(buf, sizeof(buf), system_charset_info);
+ char *p;
+
+ str.length(0);
+ if ((opt_bin_log && mysql_bin_log.append_state_pos(&str)) ||
+ !(p= thd->strmake(str.ptr(), str.length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return (uchar *)p;
+}
+
+
+static unsigned char opt_gtid_current_pos_dummy;
+static Sys_var_gtid_current_pos Sys_gtid_current_pos(
+ "gtid_current_pos", "Current GTID position of the server. Per "
+ "replication domain, this is either the last GTID replicated by a "
+ "slave thread, or the GTID logged to the binary log, whichever is "
+ "most recent.",
+ READ_ONLY GLOBAL_VAR(opt_gtid_current_pos_dummy), NO_CMD_LINE);
+
+
+uchar *
+Sys_var_gtid_current_pos::global_value_ptr(THD *thd, LEX_STRING *base)
+{
+ String str;
+ char *p;
+
+ str.length(0);
+ if (rpl_append_gtid_state(&str, true) ||
+ !(p= thd->strmake(str.ptr(), str.length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return (uchar *)p;
+}
+
+
bool
-Sys_var_gtid_pos::do_check(THD *thd, set_var *var)
+Sys_var_gtid_slave_pos::do_check(THD *thd, set_var *var)
{
String str, *res;
bool running;
@@ -1372,7 +1445,12 @@ Sys_var_gtid_pos::do_check(THD *thd, set_var *var)
return true;
if (!(res= var->value->val_str(&str)))
return true;
- if (rpl_gtid_pos_check(&((*res)[0]), res->length()))
+ if (thd->in_active_multi_stmt_transaction())
+ {
+ my_error(ER_CANT_DO_THIS_DURING_AN_TRANSACTION, MYF(0));
+ return true;
+ }
+ if (rpl_gtid_pos_check(thd, &((*res)[0]), res->length()))
return true;
if (!(var->save_result.string_value.str=
@@ -1387,7 +1465,7 @@ Sys_var_gtid_pos::do_check(THD *thd, set_var *var)
bool
-Sys_var_gtid_pos::global_update(THD *thd, set_var *var)
+Sys_var_gtid_slave_pos::global_update(THD *thd, set_var *var)
{
bool err;
@@ -1413,13 +1491,13 @@ Sys_var_gtid_pos::global_update(THD *thd, set_var *var)
uchar *
-Sys_var_gtid_pos::global_value_ptr(THD *thd, LEX_STRING *base)
+Sys_var_gtid_slave_pos::global_value_ptr(THD *thd, LEX_STRING *base)
{
String str;
char *p;
str.length(0);
- if (rpl_append_gtid_state(&str, true) ||
+ if (rpl_append_gtid_state(&str, false) ||
!(p= thd->strmake(str.ptr(), str.length())))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -1430,14 +1508,21 @@ Sys_var_gtid_pos::global_value_ptr(THD *thd, LEX_STRING *base)
}
-static unsigned char opt_gtid_pos_dummy;
-static Sys_var_gtid_pos Sys_gtid_pos(
- "gtid_pos",
+static unsigned char opt_gtid_slave_pos_dummy;
+static Sys_var_gtid_slave_pos Sys_gtid_slave_pos(
+ "gtid_slave_pos",
"The list of global transaction IDs that were last replicated on the "
- "server, one for each replication domain. This defines where a slave "
- "starts replicating from on a master when connecting with global "
- "transaction ID.",
- GLOBAL_VAR(opt_gtid_pos_dummy), NO_CMD_LINE);
+ "server, one for each replication domain.",
+ GLOBAL_VAR(opt_gtid_slave_pos_dummy), NO_CMD_LINE);
+
+
+static Sys_var_mybool Sys_gtid_strict_mode(
+ "gtid_strict_mode",
+ "Enforce strict seq_no ordering of events in the binary log. Slave "
+ "stops with an error if it encounters an event that would cause it to "
+ "generate an out-of-order binlog if executed.",
+ GLOBAL_VAR(opt_gtid_strict_mode),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
#endif
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 4a38b41ab9b..bf17040e65c 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -2049,12 +2049,114 @@ public:
/**
- Class for @@global.gtid_pos.
+ Class for @@global.gtid_current_pos.
*/
-class Sys_var_gtid_pos: public sys_var
+class Sys_var_gtid_current_pos: public sys_var
{
public:
- Sys_var_gtid_pos(const char *name_arg,
+ Sys_var_gtid_current_pos(const char *name_arg,
+ const char *comment, int flag_args, ptrdiff_t off, size_t size,
+ CMD_LINE getopt)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id,
+ getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, NULL)
+ {
+ option.var_type= GET_STR;
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool check_update_type(Item_result type) {
+ DBUG_ASSERT(false);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(false);
+ return NULL;
+ }
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base);
+};
+
+
+/**
+ Class for @@global.gtid_binlog_pos.
+*/
+class Sys_var_gtid_binlog_pos: public sys_var
+{
+public:
+ Sys_var_gtid_binlog_pos(const char *name_arg,
+ const char *comment, int flag_args, ptrdiff_t off, size_t size,
+ CMD_LINE getopt)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id,
+ getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, NULL)
+ {
+ option.var_type= GET_STR;
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool check_update_type(Item_result type) {
+ DBUG_ASSERT(false);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(false);
+ return NULL;
+ }
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base);
+};
+
+
+/**
+ Class for @@global.gtid_slave_pos.
+*/
+class Sys_var_gtid_slave_pos: public sys_var
+{
+public:
+ Sys_var_gtid_slave_pos(const char *name_arg,
const char *comment, int flag_args, ptrdiff_t off, size_t size,
CMD_LINE getopt)
: sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id,