diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 226 |
1 files changed, 224 insertions, 2 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index a7f0f003e5c..67785be6255 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -60,6 +60,7 @@ #include "rpl_tblmap.h" #include "debug_sync.h" #include "rpl_parallel.h" +#include "sql_show.h" #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -279,15 +280,180 @@ static void init_slave_psi_keys(void) #endif /* HAVE_PSI_INTERFACE */ +/* + Note: This definition needs to be kept in sync with the one in + mysql_system_tables.sql which is used by mysql_create_db. +*/ +static const char gtid_pos_table_definition1[]= + "CREATE TABLE "; +static const char gtid_pos_table_definition2[]= + " (domain_id INT UNSIGNED NOT NULL, " + "sub_id BIGINT UNSIGNED NOT NULL, " + "server_id INT UNSIGNED NOT NULL, " + "seq_no BIGINT UNSIGNED NOT NULL, " + "PRIMARY KEY (domain_id, sub_id)) CHARSET=latin1 " + "COMMENT='Replication slave GTID position' " + "ENGINE="; + +/* + Build a query string + CREATE TABLE mysql.gtid_slave_pos_<engine> ... ENGINE=<engine> +*/ +static bool +build_gtid_pos_create_query(THD *thd, String *query, + LEX_STRING *table_name, + LEX_STRING *engine_name) +{ + bool err= false; + err|= query->append(gtid_pos_table_definition1); + err|= append_identifier(thd, query, table_name->str, table_name->length); + err|= query->append(gtid_pos_table_definition2); + err|= append_identifier(thd, query, engine_name->str, engine_name->length); + return err; +} + + +static int +gtid_pos_table_creation(THD *thd, plugin_ref engine, LEX_STRING *table_name) +{ + int err; + StringBuffer<sizeof(gtid_pos_table_definition1) + + sizeof(gtid_pos_table_definition1) + + 2*FN_REFLEN> query; + + if (build_gtid_pos_create_query(thd, &query, table_name, plugin_name(engine))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + + thd->set_db("mysql", 5); + thd->clear_error(); + ulonglong thd_saved_option= thd->variables.option_bits; + /* This query shuold not be binlogged. */ + thd->variables.option_bits&= ~(ulonglong)OPTION_BIN_LOG; + thd->set_query_and_id(query.c_ptr(), query.length(), thd->charset(), + next_query_id()); + Parser_state parser_state; + err= parser_state.init(thd, thd->query(), thd->query_length()); + if (err) + goto end; + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state, + FALSE, FALSE); + if (thd->is_error()) + err= 1; +end: + thd->variables.option_bits= thd_saved_option; + thd->reset_query(); + return err; +} + + +static void +handle_gtid_pos_auto_create_request(THD *thd, void *hton) +{ + int err; + plugin_ref engine= NULL, *auto_engines; + rpl_slave_state::gtid_pos_table *entry; + StringBuffer<FN_REFLEN> loc_table_name; + LEX_STRING table_name; + + /* + Check that the plugin is still in @@gtid_pos_auto_engines, and lock + it. + */ + mysql_mutex_lock(&LOCK_global_system_variables); + engine= NULL; + for (auto_engines= opt_gtid_pos_auto_plugins; + auto_engines && *auto_engines; + ++auto_engines) + { + if (plugin_hton(*auto_engines) == hton) + { + engine= my_plugin_lock(NULL, *auto_engines); + break; + } + } + mysql_mutex_unlock(&LOCK_global_system_variables); + if (!engine) + { + /* The engine is gone from @@gtid_pos_auto_engines, so no action. */ + goto end; + } + + /* Find the entry for the table to auto-create. */ + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + entry= (rpl_slave_state::gtid_pos_table *) + rpl_global_gtid_slave_state->gtid_pos_tables; + while (entry) + { + if (entry->table_hton == hton && + entry->state == rpl_slave_state::GTID_POS_CREATE_REQUESTED) + break; + entry= entry->next; + } + if (entry) + { + entry->state = rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS; + err= loc_table_name.append(entry->table_name.str, entry->table_name.length); + } + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (!entry) + goto end; + if (err) + { + sql_print_error("Out of memory while trying to auto-create GTID position table"); + goto end; + } + table_name.str= loc_table_name.c_ptr_safe(); + table_name.length= loc_table_name.length(); + + err= gtid_pos_table_creation(thd, engine, &table_name); + if (err) + { + sql_print_error("Error auto-creating GTID position table `mysql.%s`: %s Error_code: %d", + table_name.str, thd->get_stmt_da()->message(), + thd->get_stmt_da()->sql_errno()); + thd->clear_error(); + goto end; + } + + /* Now enable the entry for the auto-created table. */ + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + entry= (rpl_slave_state::gtid_pos_table *) + rpl_global_gtid_slave_state->gtid_pos_tables; + while (entry) + { + if (entry->table_hton == hton && + entry->state == rpl_slave_state::GTID_POS_CREATE_IN_PROGRESS) + { + entry->state= rpl_slave_state::GTID_POS_AVAILABLE; + break; + } + entry= entry->next; + } + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + +end: + if (engine) + plugin_unlock(NULL, engine); +} + + static bool slave_background_thread_running; static bool slave_background_thread_stop; static bool slave_background_thread_gtid_loaded; -struct slave_background_kill_t { +static struct slave_background_kill_t { slave_background_kill_t *next; THD *to_kill; } *slave_background_kill_list; +static struct slave_background_gtid_pos_create_t { + slave_background_gtid_pos_create_t *next; + void *hton; +} *slave_background_gtid_pos_create_list; + pthread_handler_t handle_slave_background(void *arg __attribute__((unused))) @@ -321,6 +487,7 @@ handle_slave_background(void *arg __attribute__((unused))) do { slave_background_kill_t *kill_list; + slave_background_gtid_pos_create_t *create_list; thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background, &stage_slave_background_wait_request, @@ -329,12 +496,14 @@ handle_slave_background(void *arg __attribute__((unused))) { stop= abort_loop || thd->killed || slave_background_thread_stop; kill_list= slave_background_kill_list; - if (stop || kill_list) + create_list= slave_background_gtid_pos_create_list; + if (stop || kill_list || create_list) break; mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); } slave_background_kill_list= NULL; + slave_background_gtid_pos_create_list= NULL; thd->EXIT_COND(&old_stage); while (kill_list) @@ -353,6 +522,16 @@ handle_slave_background(void *arg __attribute__((unused))) mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready); my_free(p); } + + while (create_list) + { + slave_background_gtid_pos_create_t *next= create_list->next; + void *hton= create_list->hton; + handle_gtid_pos_auto_create_request(thd, hton); + my_free(create_list); + create_list= next; + } + mysql_mutex_lock(&LOCK_slave_background); } while (!stop); @@ -392,6 +571,41 @@ slave_background_kill_request(THD *to_kill) /* + This function must only be called from a slave SQL thread (or worker thread), + to ensure that the table_entry will not go away before we can lock the + LOCK_slave_state. +*/ +void +slave_background_gtid_pos_create_request( + rpl_slave_state::gtid_pos_table *table_entry) +{ + slave_background_gtid_pos_create_t *p; + + if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) + return; + p= (slave_background_gtid_pos_create_t *)my_malloc(sizeof(*p), MYF(MY_WME)); + if (!p) + return; + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) + { + my_free(p); + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + return; + } + table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED; + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + + p->hton= table_entry->table_hton; + mysql_mutex_lock(&LOCK_slave_background); + p->next= slave_background_gtid_pos_create_list; + slave_background_gtid_pos_create_list= p; + mysql_cond_signal(&COND_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); +} + + +/* Start the slave background thread. This thread is currently used for two purposes: @@ -5065,6 +5279,14 @@ pthread_handler_t handle_slave_sql(void *arg) if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode) goto err; } + /* Re-load the set of mysql.gtid_slave_posXXX tables available. */ + if (find_gtid_slave_pos_tables(thd)) + { + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Error processing replication GTID position tables: %s", + thd->get_stmt_da()->message()); + goto err; + } /* execute init_slave variable */ if (opt_init_slave.length) |