diff options
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 364 |
1 files changed, 285 insertions, 79 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index cc6c169c0df..b34942ba091 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -16,24 +16,24 @@ /* Definitions for MariaDB global transaction ID (GTID). */ -#include <my_global.h> +#include "mariadb.h" #include "sql_priv.h" -#include "my_sys.h" #include "unireg.h" -#include "my_global.h" +#include "mariadb.h" #include "sql_base.h" #include "sql_parse.h" #include "key.h" #include "rpl_gtid.h" #include "rpl_rli.h" +#include "slave.h" #include "log_event.h" -const LEX_STRING rpl_gtid_slave_state_table_name= - { C_STRING_WITH_LEN("gtid_slave_pos") }; +const LEX_CSTRING rpl_gtid_slave_state_table_name= + { STRING_WITH_LEN("gtid_slave_pos") }; void -rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, +rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton, rpl_group_info *rgi) { int err; @@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, it is even committed. */ mysql_mutex_lock(&LOCK_slave_state); - err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi); mysql_mutex_unlock(&LOCK_slave_state); if (err) { @@ -74,12 +74,14 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) if (rgi->gtid_pending) { uint64 sub_id= rgi->gtid_sub_id; + void *hton= NULL; + rgi->gtid_pending= false; if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) { - if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false)) + if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false, &hton)) DBUG_RETURN(1); - update_state_hash(sub_id, &rgi->current_gtid, rgi); + update_state_hash(sub_id, &rgi->current_gtid, hton, rgi); } rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; } @@ -165,9 +167,8 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi) break; } thd= rgi->thd; - if (thd->check_killed()) + if (unlikely(thd->check_killed())) { - thd->send_kill_message(); res= -1; break; } @@ -243,7 +244,7 @@ rpl_slave_state_free_element(void *arg) rpl_slave_state::rpl_slave_state() - : last_sub_id(0), loaded(false) + : last_sub_id(0), gtid_pos_tables(0), loaded(false) { mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); @@ -255,6 +256,7 @@ rpl_slave_state::rpl_slave_state() rpl_slave_state::~rpl_slave_state() { + free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables); truncate_hash(); my_hash_free(&hash); delete_dynamic(>id_sort_array); @@ -286,11 +288,12 @@ rpl_slave_state::truncate_hash() int rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no, rpl_group_info *rgi) + uint64 seq_no, void *hton, rpl_group_info *rgi) { element *elem= NULL; list_element *list_elem= NULL; + DBUG_ASSERT(hton || !loaded); if (!(elem= get_element(domain_id))) return 1; @@ -313,7 +316,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, { if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER) { -#ifndef DBUG_OFF +#ifdef DBUG_ASSERT_EXISTS Relay_log_info *rli= rgi->rli; #endif uint32 count= elem->owner_count; @@ -339,6 +342,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, list_elem->server_id= server_id; list_elem->sub_id= sub_id; list_elem->seq_no= seq_no; + list_elem->hton= hton; elem->add(list_elem); if (last_sub_id < sub_id) @@ -409,10 +413,7 @@ rpl_slave_state::truncate_state_table(THD *thd) int err= 0; tmp_disable_binlog(thd); - tlist.init_one_table(STRING_WITH_LEN("mysql"), - rpl_gtid_slave_state_table_name.str, - rpl_gtid_slave_state_table_name.length, - NULL, TL_WRITE); + tlist.init_one_table(&MYSQL_SCHEMA_NAME, &rpl_gtid_slave_state_table_name, NULL, TL_WRITE); if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0))) { tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED, "mysql", @@ -440,17 +441,17 @@ rpl_slave_state::truncate_state_table(THD *thd) static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= { - { { C_STRING_WITH_LEN("domain_id") }, - { C_STRING_WITH_LEN("int(10) unsigned") }, + { { STRING_WITH_LEN("domain_id") }, + { STRING_WITH_LEN("int(10) unsigned") }, {NULL, 0} }, - { { C_STRING_WITH_LEN("sub_id") }, - { C_STRING_WITH_LEN("bigint(20) unsigned") }, + { { STRING_WITH_LEN("sub_id") }, + { STRING_WITH_LEN("bigint(20) unsigned") }, {NULL, 0} }, - { { C_STRING_WITH_LEN("server_id") }, - { C_STRING_WITH_LEN("int(10) unsigned") }, + { { STRING_WITH_LEN("server_id") }, + { STRING_WITH_LEN("int(10) unsigned") }, {NULL, 0} }, - { { C_STRING_WITH_LEN("seq_no") }, - { C_STRING_WITH_LEN("bigint(20) unsigned") }, + { { STRING_WITH_LEN("seq_no") }, + { STRING_WITH_LEN("bigint(20) unsigned") }, {NULL, 0} }, }; @@ -481,6 +482,94 @@ gtid_check_rpl_slave_state_table(TABLE *table) /* + Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine + that is already in use by the current transaction, if any. +*/ +void +rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) +{ + struct gtid_pos_table *list, *table_entry, *default_entry; + + /* + See comments on rpl_slave_state::gtid_pos_tables for rules around proper + access to the list. + */ + list= (struct gtid_pos_table *) + my_atomic_loadptr_explicit(>id_pos_tables, MY_MEMORY_ORDER_ACQUIRE); + + Ha_trx_info *ha_info; + uint count = 0; + for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next()) + { + void *trx_hton= ha_info->ht(); + table_entry= list; + + if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton) + continue; + while (table_entry) + { + if (table_entry->table_hton == trx_hton) + { + if (likely(table_entry->state == GTID_POS_AVAILABLE)) + { + *out_tablename= table_entry->table_name; + /* + Check if this is a cross-engine transaction, so we can correctly + maintain the rpl_transactions_multi_engine status variable. + */ + if (count >= 1) + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + else + { + for (;;) + { + ha_info= ha_info->next(); + if (!ha_info) + break; + if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) + { + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + break; + } + } + } + return; + } + /* + This engine is marked to automatically create the table. + We cannot easily do this here (possibly in the middle of a + transaction). But we can request the slave background thread + to create it, and in a short while it should become available + for following transactions. + */ +#ifdef HAVE_REPLICATION + slave_background_gtid_pos_create_request(table_entry); +#endif + break; + } + table_entry= table_entry->next; + } + ++count; + } + /* + If we cannot find any table whose engine matches an engine that is + already active in the transaction, or if there is no current transaction + engines available, we return the default gtid_slave_pos table. + */ + default_entry= (struct gtid_pos_table *) + my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE); + *out_tablename= default_entry->table_name; + /* Record in status that we failed to find a suitable gtid_pos table. */ + if (count > 0) + { + statistic_increment(transactions_gtid_foreign_engine, LOCK_status); + if (count > 1) + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + } +} + + +/* Write a gtid to the replication slave state table. gtid The global transaction id for this event group. @@ -496,19 +585,24 @@ gtid_check_rpl_slave_state_table(TABLE *table) */ int rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - rpl_group_info *rgi, bool in_statement) + rpl_group_info *rgi, bool in_statement, + void **out_hton) { TABLE_LIST tlist; - int err= 0; + int err= 0, not_sql_thread; bool table_opened= false; TABLE *table; - list_element *elist= 0, *cur, *next; + list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr; + uint64 best_sub_id; element *elem; ulonglong thd_saved_option= thd->variables.option_bits; Query_tables_list lex_backup; wait_for_commit* suspended_wfc; + void *hton= NULL; + LEX_CSTRING gtid_pos_table_name; DBUG_ENTER("record_gtid"); + *out_hton= NULL; if (unlikely(!loaded)) { /* @@ -524,6 +618,25 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, if (!in_statement) thd->reset_for_next_command(); + /* + Only the SQL thread can call select_gtid_pos_table without a mutex + Other threads needs to use a mutex and take into account that the + result may change during execution, so we have to make a copy. + */ + + if ((not_sql_thread= (thd->system_thread != SYSTEM_THREAD_SLAVE_SQL))) + mysql_mutex_lock(&LOCK_slave_state); + select_gtid_pos_table(thd, >id_pos_table_name); + if (not_sql_thread) + { + LEX_CSTRING *tmp= thd->make_clex_string(gtid_pos_table_name.str, + gtid_pos_table_name.length); + mysql_mutex_unlock(&LOCK_slave_state); + if (!tmp) + DBUG_RETURN(1); + gtid_pos_table_name= *tmp; + } + DBUG_EXECUTE_IF("gtid_inject_record_gtid", { my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0)); @@ -553,14 +666,12 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, */ suspended_wfc= thd->suspend_subsequent_commits(); thd->lex->reset_n_backup_query_tables_list(&lex_backup); - tlist.init_one_table(STRING_WITH_LEN("mysql"), - rpl_gtid_slave_state_table_name.str, - rpl_gtid_slave_state_table_name.length, - NULL, TL_WRITE); + tlist.init_one_table(&MYSQL_SCHEMA_NAME, >id_pos_table_name, NULL, TL_WRITE); if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) goto end; table_opened= true; table= tlist.table; + hton= table->s->db_type(); if ((err= gtid_check_rpl_slave_state_table(table))) goto end; @@ -596,6 +707,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, table->file->print_error(err, MYF(0)); goto end; } + *out_hton= hton; if(opt_bin_log && (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id, @@ -613,36 +725,62 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, err= 1; goto end; } - if ((elist= elem->grab_list()) != NULL) + + /* Now pull out all GTIDs that were recorded in this engine. */ + delete_list = NULL; + next_ptr_ptr= &elem->list; + cur= elem->list; + best_sub_id= 0; + best_ptr_ptr= NULL; + while (cur) { - /* Delete any old stuff, but keep around the most recent one. */ - uint64 best_sub_id= elist->sub_id; - list_element **best_ptr_ptr= &elist; - cur= elist; - while ((next= cur->next)) + list_element *next= cur->next; + if (cur->hton == hton) { - if (next->sub_id > best_sub_id) + /* Belongs to same engine, so move it to the delete list. */ + cur->next= delete_list; + delete_list= cur; + if (cur->sub_id > best_sub_id) { - best_sub_id= next->sub_id; + best_sub_id= cur->sub_id; + best_ptr_ptr= &delete_list; + } + else if (best_ptr_ptr == &delete_list) best_ptr_ptr= &cur->next; + } + else + { + /* Another engine, leave it in the list. */ + if (cur->sub_id > best_sub_id) + { + best_sub_id= cur->sub_id; + /* Current best is not on the delete list. */ + best_ptr_ptr= NULL; } - cur= next; + *next_ptr_ptr= cur; + next_ptr_ptr= &cur->next; } - /* - Delete the highest sub_id element from the old list, and put it back as - the single-element new list. - */ + cur= next; + } + *next_ptr_ptr= NULL; + /* + If the highest sub_id element is on the delete list, put it back on the + original list, to preserve the highest sub_id element in the table for + GTID position recovery. + */ + if (best_ptr_ptr) + { cur= *best_ptr_ptr; *best_ptr_ptr= cur->next; - cur->next= NULL; + cur->next= elem->list; elem->list= cur; } mysql_mutex_unlock(&LOCK_slave_state); - if (!elist) + if (!delete_list) goto end; - /* Now delete any already committed rows. */ + /* Now delete any already committed GTIDs. */ bitmap_set_bit(table->read_set, table->field[0]->field_index); bitmap_set_bit(table->read_set, table->field[1]->field_index); @@ -651,7 +789,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, table->file->print_error(err, MYF(0)); goto end; } - cur = elist; + cur = delete_list; while (cur) { uchar key_buffer[4+8]; @@ -696,13 +834,13 @@ end: if (err || (err= ha_commit_trans(thd, FALSE))) { /* - If error, we need to put any remaining elist back into the HASH so we - can do another delete attempt later. + If error, we need to put any remaining delete_list back into the HASH + so we can do another delete attempt later. */ - if (elist) + if (delete_list) { - put_back_list(gtid->domain_id, elist); - elist = 0; + put_back_list(gtid->domain_id, delete_list); + delete_list = 0; } ha_rollback_trans(thd, FALSE); @@ -720,14 +858,14 @@ end: are rolled back and retried after record_gtid(). */ #ifdef HAVE_REPLICATION - rgi->pending_gtid_deletes_save(gtid->domain_id, elist); + rgi->pending_gtid_deletes_save(gtid->domain_id, delete_list); #endif } else { thd->mdl_context.release_transactional_locks(); #ifdef HAVE_REPLICATION - rpl_group_info::pending_gtid_deletes_free(elist); + rpl_group_info::pending_gtid_deletes_free(delete_list); #endif } } @@ -1013,24 +1151,24 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) Returns 0 on ok, non-zero on parse error. */ static int -gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid) +gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) { char *q; - char *p= *ptr; + const char *p= *ptr; uint64 v1, v2, v3; int err= 0; - q= end; + q= (char*) end; v1= (uint64)my_strtoll10(p, &q, &err); if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-') return 1; p= q+1; - q= end; + q= (char*) end; v2= (uint64)my_strtoll10(p, &q, &err); if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-') return 1; p= q+1; - q= end; + q= (char*) end; v3= (uint64)my_strtoll10(p, &q, &err); if (err != 0) return 1; @@ -1046,8 +1184,8 @@ gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid) rpl_gtid * gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) { - char *p= const_cast<char *>(str); - char *end= p + str_len; + const char *p= const_cast<char *>(str); + const char *end= p + str_len; uint32 len= 0, alloc_len= 5; rpl_gtid *list= NULL; @@ -1092,10 +1230,10 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) Returns 0 if ok, non-zero if error. */ int -rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, +rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len, bool reset, bool in_statement) { - char *end= state_from_master + len; + const char *end= state_from_master + len; if (reset) { @@ -1109,11 +1247,12 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, { rpl_gtid gtid; uint64 sub_id; + void *hton= NULL; if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_sub_id(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, NULL, in_statement) || - update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL)) + record_gtid(thd, >id, sub_id, NULL, in_statement, &hton) || + update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL)) return 1; if (state_from_master == end) break; @@ -1147,6 +1286,75 @@ rpl_slave_state::is_empty() } +void +rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list) +{ + struct gtid_pos_table *cur, *next; + + cur= list; + while (cur) + { + next= cur->next; + my_free(cur); + cur= next; + } +} + + +/* + Replace the list of available mysql.gtid_slave_posXXX tables with a new list. + The caller must be holding LOCK_slave_state. Additionally, this function + must only be called while all SQL threads are stopped. +*/ +void +rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list, + rpl_slave_state::gtid_pos_table *default_entry) +{ + gtid_pos_table *old_list; + + mysql_mutex_assert_owner(&LOCK_slave_state); + old_list= (struct gtid_pos_table *)gtid_pos_tables; + my_atomic_storeptr_explicit(>id_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE); + my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry, + MY_MEMORY_ORDER_RELEASE); + free_gtid_pos_tables(old_list); +} + + +void +rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry) +{ + mysql_mutex_assert_owner(&LOCK_slave_state); + entry->next= (struct gtid_pos_table *)gtid_pos_tables; + my_atomic_storeptr_explicit(>id_pos_tables, entry, MY_MEMORY_ORDER_RELEASE); +} + + +struct rpl_slave_state::gtid_pos_table * +rpl_slave_state::alloc_gtid_pos_table(LEX_CSTRING *table_name, void *hton, + rpl_slave_state::gtid_pos_table_state state) +{ + struct gtid_pos_table *p; + char *allocated_str; + + if (!my_multi_malloc(MYF(MY_WME), + &p, sizeof(*p), + &allocated_str, table_name->length+1, + NULL)) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1)); + return NULL; + } + memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0' + p->next = NULL; + p->table_hton= hton; + p->table_name.str= allocated_str; + p->table_name.length= table_name->length; + p->state= state; + return p; +} + + void rpl_binlog_state::init() { my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), @@ -1492,7 +1700,6 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest) mysql_mutex_lock(&LOCK_binlog_state); for (i= 0; i < hash.records; ++i) { - size_t res; element *e= (element *)my_hash_element(&hash, i); if (!e->last_gtid) { @@ -1512,8 +1719,8 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest) gtid= e->last_gtid; longlong10_to_str(gtid->seq_no, buf, 10); - res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf); - if (res == (size_t) -1) + if (my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, + buf)) { res= 1; goto end; @@ -1532,7 +1739,7 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src) { /* 10-digit - 10-digit - 20-digit \n \0 */ char buf[10+1+10+1+20+1+1]; - char *p, *end; + const char *p, *end; rpl_gtid gtid; int res= 0; @@ -1944,9 +2151,9 @@ slave_connection_state::~slave_connection_state() */ int -slave_connection_state::load(char *slave_request, size_t len) +slave_connection_state::load(const char *slave_request, size_t len) { - char *p, *end; + const char *p, *end; uchar *rec; rpl_gtid *gtid; const entry *e; @@ -2079,15 +2286,14 @@ void slave_connection_state::remove(const rpl_gtid *in_gtid) { uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); -#ifndef DBUG_OFF +#ifdef DBUG_ASSERT_EXISTS bool err; rpl_gtid *slave_gtid= &((entry *)rec)->gtid; DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */); DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id); DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no); + err= #endif - - IF_DBUG(err=, ) my_hash_delete(&hash, rec); DBUG_ASSERT(!err); } @@ -2427,7 +2633,7 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, &stage_master_gtid_wait_primary, &old_stage); do { - if (thd->check_killed()) + if (unlikely(thd->check_killed(1))) break; else if (wait_until) { @@ -2479,7 +2685,7 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, &stage_master_gtid_wait, &old_stage); did_enter_cond= true; } - while (!elem.done && !thd->check_killed()) + while (!elem.done && likely(!thd->check_killed(1))) { thd_wait_begin(thd, THD_WAIT_BINLOG); if (wait_until) |