diff options
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 443 |
1 files changed, 280 insertions, 163 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index a8b39d6d15b..f58df09623c 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -80,7 +80,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) rgi->gtid_pending= false; if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) { - if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false, &hton)) + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton)) DBUG_RETURN(1); update_state_hash(sub_id, &rgi->current_gtid, hton, rgi); } @@ -245,7 +245,7 @@ rpl_slave_state_free_element(void *arg) rpl_slave_state::rpl_slave_state() - : last_sub_id(0), gtid_pos_tables(0), loaded(false) + : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false) { mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); @@ -257,7 +257,7 @@ rpl_slave_state::rpl_slave_state() rpl_slave_state::~rpl_slave_state() { - free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables); + free_gtid_pos_tables(gtid_pos_tables.load(std::memory_order_relaxed)); truncate_hash(); my_hash_free(&hash); delete_dynamic(>id_sort_array); @@ -332,14 +332,11 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, } } rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; - -#ifdef HAVE_REPLICATION - rgi->pending_gtid_deletes_clear(); -#endif } if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) return 1; + list_elem->domain_id= domain_id; list_elem->server_id= server_id; list_elem->sub_id= sub_id; list_elem->seq_no= seq_no; @@ -349,6 +346,15 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, if (last_sub_id < sub_id) last_sub_id= sub_id; +#ifdef HAVE_REPLICATION + ++pending_gtid_count; + if (pending_gtid_count >= opt_gtid_cleanup_batch_size) + { + pending_gtid_count = 0; + slave_background_gtid_pending_delete_request(); + } +#endif + return 0; } @@ -383,20 +389,22 @@ rpl_slave_state::get_element(uint32 domain_id) int -rpl_slave_state::put_back_list(uint32 domain_id, list_element *list) +rpl_slave_state::put_back_list(list_element *list) { - element *e; + element *e= NULL; int err= 0; mysql_mutex_lock(&LOCK_slave_state); - if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0))) - { - err= 1; - goto end; - } while (list) { list_element *next= list->next; + + if ((!e || e->domain_id != list->domain_id) && + !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id, 0))) + { + err= 1; + goto end; + } e->add(list); list= next; } @@ -489,21 +497,18 @@ gtid_check_rpl_slave_state_table(TABLE *table) void rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) { - struct gtid_pos_table *list, *table_entry, *default_entry; - /* See comments on rpl_slave_state::gtid_pos_tables for rules around proper access to the list. */ - list= (struct gtid_pos_table *) - my_atomic_loadptr_explicit(>id_pos_tables, MY_MEMORY_ORDER_ACQUIRE); + auto list= gtid_pos_tables.load(std::memory_order_acquire); Ha_trx_info *ha_info; uint count = 0; for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next()) { void *trx_hton= ha_info->ht(); - table_entry= list; + auto table_entry= list; if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton) continue; @@ -557,9 +562,8 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) already active in the transaction, or if there is no current transaction engines available, we return the default gtid_slave_pos table. */ - default_entry= (struct gtid_pos_table *) - my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE); - *out_tablename= default_entry->table_name; + *out_tablename= + default_gtid_pos_table.load(std::memory_order_acquire)->table_name; /* Record in status that we failed to find a suitable gtid_pos table. */ if (count > 0) { @@ -573,12 +577,12 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) /* Write a gtid to the replication slave state table. + Do it as part of the transaction, to get slave crash safety, or as a separate + transaction if !in_transaction (eg. MyISAM or DDL). + gtid The global transaction id for this event group. sub_id Value allocated within the sub_id when the event group was read (sub_id must be consistent with commit order in master binlog). - rgi rpl_group_info context, if we are recording the gtid transactionally - as part of replicating a transactional event. NULL if called from - outside of a replicated transaction. Note that caller must later ensure that the new gtid and sub_id is inserted into the appropriate HASH element with rpl_slave_state.add(), so that it can @@ -586,16 +590,13 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) */ int rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - rpl_group_info *rgi, bool in_statement, + bool in_transaction, bool in_statement, void **out_hton) { TABLE_LIST tlist; int err= 0, not_sql_thread; bool table_opened= false; TABLE *table; - list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr; - uint64 best_sub_id; - element *elem; ulonglong thd_saved_option= thd->variables.option_bits; Query_tables_list lex_backup; wait_for_commit* suspended_wfc; @@ -685,7 +686,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, thd->wsrep_ignore_table= true; #endif - if (!rgi) + if (!in_transaction) { DBUG_PRINT("info", ("resetting OPTION_BEGIN")); thd->variables.option_bits&= @@ -717,168 +718,287 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, my_error(ER_OUT_OF_RESOURCES, MYF(0)); goto end; } +end: - mysql_mutex_lock(&LOCK_slave_state); - if ((elem= get_element(gtid->domain_id)) == NULL) +#ifdef WITH_WSREP + thd->wsrep_ignore_table= false; +#endif + + if (table_opened) { - mysql_mutex_unlock(&LOCK_slave_state); - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - err= 1; - goto end; + if (err || (err= ha_commit_trans(thd, FALSE))) + ha_rollback_trans(thd, FALSE); + + close_thread_tables(thd); + if (in_transaction) + thd->mdl_context.release_statement_locks(); + else + thd->mdl_context.release_transactional_locks(); } + thd->lex->restore_backup_query_tables_list(&lex_backup); + thd->variables.option_bits= thd_saved_option; + thd->resume_subsequent_commits(suspended_wfc); + DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep", + { + if (gtid->server_id == 100) + my_sleep(500000); + }); + DBUG_RETURN(err); +} - /* Now pull out all GTIDs that were recorded in this engine. */ - delete_list = NULL; - next_ptr_ptr= &elem->list; - cur= elem->list; - best_sub_id= 0; - best_ptr_ptr= NULL; - while (cur) + +/* + Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are + no longer needed and can be deleted from the table. + + Within each domain, we need to keep around the latest GTID (the one with the + highest sub_id), but any others in that domain can be deleted. +*/ +rpl_slave_state::list_element * +rpl_slave_state::gtid_grab_pending_delete_list() +{ + uint32 i; + list_element *full_list; + + mysql_mutex_lock(&LOCK_slave_state); + full_list= NULL; + for (i= 0; i < hash.records; ++i) { - list_element *next= cur->next; - if (cur->hton == hton) - { - /* Belongs to same engine, so move it to the delete list. */ - cur->next= delete_list; - delete_list= cur; - if (cur->sub_id > best_sub_id) + element *elem= (element *)my_hash_element(&hash, i); + list_element *elist= elem->list; + list_element *last_elem, **best_ptr_ptr, *cur, *next; + uint64 best_sub_id; + + if (!elist) + continue; /* Nothing here */ + + /* Delete any old stuff, but keep around the most recent one. */ + cur= elist; + best_sub_id= cur->sub_id; + best_ptr_ptr= &elist; + last_elem= cur; + while ((next= cur->next)) { + last_elem= next; + if (next->sub_id > best_sub_id) { - best_sub_id= cur->sub_id; - best_ptr_ptr= &delete_list; - } - else if (best_ptr_ptr == &delete_list) + best_sub_id= next->sub_id; best_ptr_ptr= &cur->next; - } - else - { - /* Another engine, leave it in the list. */ - if (cur->sub_id > best_sub_id) - { - best_sub_id= cur->sub_id; - /* Current best is not on the delete list. */ - best_ptr_ptr= NULL; } - *next_ptr_ptr= cur; - next_ptr_ptr= &cur->next; + cur= next; } - cur= next; - } - *next_ptr_ptr= NULL; - /* - If the highest sub_id element is on the delete list, put it back on the - original list, to preserve the highest sub_id element in the table for - GTID position recovery. - */ - if (best_ptr_ptr) - { + /* + Append the new elements to the full list. Note the order is important; + we do it here so that we do not break the list if best_sub_id is the + last of the new elements. + */ + last_elem->next= full_list; + /* + Delete the highest sub_id element from the old list, and put it back as + the single-element new list. + */ cur= *best_ptr_ptr; *best_ptr_ptr= cur->next; - cur->next= elem->list; + cur->next= NULL; elem->list= cur; + + /* + Collect the full list so far here. Note that elist may have moved if we + deleted the first element, so order is again important. + */ + full_list= elist; } mysql_mutex_unlock(&LOCK_slave_state); - if (!delete_list) - goto end; + return full_list; +} - /* Now delete any already committed GTIDs. */ - bitmap_set_bit(table->read_set, table->field[0]->field_index); - bitmap_set_bit(table->read_set, table->field[1]->field_index); - if ((err= table->file->ha_index_init(0, 0))) +/* Find the mysql.gtid_slave_posXXX table associated with a given hton. */ +LEX_CSTRING * +rpl_slave_state::select_gtid_pos_table(void *hton) +{ + /* + See comments on rpl_slave_state::gtid_pos_tables for rules around proper + access to the list. + */ + auto table_entry= gtid_pos_tables.load(std::memory_order_acquire); + + while (table_entry) { - table->file->print_error(err, MYF(0)); - goto end; + if (table_entry->table_hton == hton) + { + if (likely(table_entry->state == GTID_POS_AVAILABLE)) + return &table_entry->table_name; + } + table_entry= table_entry->next; } - cur = delete_list; - while (cur) - { - uchar key_buffer[4+8]; - DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete", - { err= ENOENT; - table->file->print_error(err, MYF(0)); - /* `break' does not work inside DBUG_EXECUTE_IF */ - goto dbug_break; }); + return &default_gtid_pos_table.load(std::memory_order_acquire)->table_name; +} + - next= cur->next; +void +rpl_slave_state::gtid_delete_pending(THD *thd, + rpl_slave_state::list_element **list_ptr) +{ + int err= 0; + ulonglong thd_saved_option; + + if (unlikely(!loaded)) + return; + +#ifdef WITH_WSREP + /* + Updates in slave state table should not be appended to galera transaction + writeset. + */ + thd->wsrep_ignore_table= true; +#endif + + thd_saved_option= thd->variables.option_bits; + thd->variables.option_bits&= + ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG | + OPTION_GTID_BEGIN); + + while (*list_ptr) + { + LEX_CSTRING *gtid_pos_table_name, *tmp_table_name; + Query_tables_list lex_backup; + TABLE_LIST tlist; + TABLE *table; + handler::Table_flags direct_pos= 0; + list_element *cur, **cur_ptr_ptr; + bool table_opened= false; + bool index_inited= false; + void *hton= (*list_ptr)->hton; + + thd->reset_for_next_command(); - table->field[1]->store(cur->sub_id, true); - /* domain_id is already set in table->record[0] from write_row() above. */ - key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); - if (table->file->ha_index_read_map(table->record[1], key_buffer, - HA_WHOLE_KEY, HA_READ_KEY_EXACT)) - /* We cannot find the row, assume it is already deleted. */ - ; - else if ((err= table->file->ha_delete_row(table->record[1]))) - table->file->print_error(err, MYF(0)); /* - In case of error, we still discard the element from the list. We do - not want to endlessly error on the same element in case of table - corruption or such. + Only the SQL thread can call select_gtid_pos_table without a mutex + Other threads needs to use a mutex and take into account that the + result may change during execution, so we have to make a copy. */ - cur= next; - if (err) + mysql_mutex_lock(&LOCK_slave_state); + tmp_table_name= select_gtid_pos_table(hton); + gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str, + tmp_table_name->length); + mysql_mutex_unlock(&LOCK_slave_state); + if (!gtid_pos_table_name) + { + /* Out of memory - we can try again later. */ break; - } -IF_DBUG(dbug_break:, ) - table->file->ha_index_end(); + } -end: + thd->lex->reset_n_backup_query_tables_list(&lex_backup); + tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; -#ifdef WITH_WSREP - thd->wsrep_ignore_table= false; -#endif + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; - if (table_opened) - { - if (err || (err= ha_commit_trans(thd, FALSE))) + direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION; + bitmap_set_all(table->write_set); + table->rpl_write_set= table->write_set; + + /* Now delete any already committed GTIDs. */ + bitmap_set_bit(table->read_set, table->field[0]->field_index); + bitmap_set_bit(table->read_set, table->field[1]->field_index); + + if (!direct_pos) { - /* - If error, we need to put any remaining delete_list back into the HASH - so we can do another delete attempt later. - */ - if (delete_list) + if ((err= table->file->ha_index_init(0, 0))) { - put_back_list(gtid->domain_id, delete_list); - delete_list = 0; + table->file->print_error(err, MYF(0)); + goto end; } - - ha_rollback_trans(thd, FALSE); + index_inited= true; } - close_thread_tables(thd); - if (rgi) + + cur = *list_ptr; + cur_ptr_ptr = list_ptr; + do { - thd->mdl_context.release_statement_locks(); + uchar key_buffer[4+8]; + list_element *next= cur->next; + + if (cur->hton == hton) + { + int res; + + table->field[0]->store((ulonglong)cur->domain_id, true); + table->field[1]->store(cur->sub_id, true); + if (direct_pos) + { + res= table->file->ha_rnd_pos_by_record(table->record[0]); + } + else + { + key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); + res= table->file->ha_index_read_map(table->record[0], key_buffer, + HA_WHOLE_KEY, HA_READ_KEY_EXACT); + } + DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete", + { res= 1; + err= ENOENT; + sql_print_error("<DEBUG> Error deleting old GTID row"); + }); + if (res) + /* We cannot find the row, assume it is already deleted. */ + ; + else if ((err= table->file->ha_delete_row(table->record[0]))) + { + sql_print_error("Error deleting old GTID row: %s", + thd->get_stmt_da()->message()); + /* + In case of error, we still discard the element from the list. We do + not want to endlessly error on the same element in case of table + corruption or such. + */ + } + *cur_ptr_ptr= next; + my_free(cur); + } + else + { + /* Leave this one in the list until we get to the table for its hton. */ + cur_ptr_ptr= &cur->next; + } + cur= next; + if (err) + break; + } while (cur); +end: + if (table_opened) + { + DBUG_ASSERT(direct_pos || index_inited || err); /* - Save the list of old gtid entries we deleted. If this transaction - fails later for some reason and is rolled back, the deletion of those - entries will be rolled back as well, and we will need to put them back - on the to-be-deleted list so we can re-do the deletion. Otherwise - redundant rows in mysql.gtid_slave_pos may accumulate if transactions - are rolled back and retried after record_gtid(). + Index may not be initialized if there was a failure during + 'ha_index_init'. Hence check if index initialization is successful and + then invoke ha_index_end(). Ending an index which is not initialized + will lead to assert. */ -#ifdef HAVE_REPLICATION - rgi->pending_gtid_deletes_save(gtid->domain_id, delete_list); -#endif - } - else - { - thd->mdl_context.release_transactional_locks(); -#ifdef HAVE_REPLICATION - rpl_group_info::pending_gtid_deletes_free(delete_list); -#endif + if (index_inited) + table->file->ha_index_end(); + + if (err || (err= ha_commit_trans(thd, FALSE))) + ha_rollback_trans(thd, FALSE); } + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + thd->lex->restore_backup_query_tables_list(&lex_backup); + + if (err) + break; } - thd->lex->restore_backup_query_tables_list(&lex_backup); thd->variables.option_bits= thd_saved_option; - thd->resume_subsequent_commits(suspended_wfc); - DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep", - { - if (gtid->server_id == 100) - my_sleep(500000); - }); - DBUG_RETURN(err); + +#ifdef WITH_WSREP + thd->wsrep_ignore_table= false; +#endif } @@ -1252,7 +1372,7 @@ rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len, if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_sub_id(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, NULL, in_statement, &hton) || + record_gtid(thd, >id, sub_id, false, in_statement, &hton) || update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL)) return 1; if (state_from_master == end) @@ -1311,13 +1431,10 @@ void rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list, rpl_slave_state::gtid_pos_table *default_entry) { - gtid_pos_table *old_list; - mysql_mutex_assert_owner(&LOCK_slave_state); - old_list= (struct gtid_pos_table *)gtid_pos_tables; - my_atomic_storeptr_explicit(>id_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE); - my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry, - MY_MEMORY_ORDER_RELEASE); + auto old_list= gtid_pos_tables.load(std::memory_order_relaxed); + gtid_pos_tables.store(new_list, std::memory_order_release); + default_gtid_pos_table.store(default_entry, std::memory_order_release); free_gtid_pos_tables(old_list); } @@ -1326,8 +1443,8 @@ void rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry) { mysql_mutex_assert_owner(&LOCK_slave_state); - entry->next= (struct gtid_pos_table *)gtid_pos_tables; - my_atomic_storeptr_explicit(>id_pos_tables, entry, MY_MEMORY_ORDER_RELEASE); + entry->next= gtid_pos_tables.load(std::memory_order_relaxed); + gtid_pos_tables.store(entry, std::memory_order_release); } |