diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 242 |
1 files changed, 197 insertions, 45 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 9d4049c6452..7e261dc0233 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -64,6 +64,7 @@ #include "rpl_parallel.h" #include "sql_show.h" #include "semisync_slave.h" +#include "sql_manager.h" #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -363,9 +364,9 @@ end: } -static void -handle_gtid_pos_auto_create_request(THD *thd, void *hton) +static void bg_gtid_pos_auto_create(void *hton) { + THD *thd= NULL; int UNINIT_VAR(err); plugin_ref engine= NULL, *auto_engines; rpl_slave_state::gtid_pos_table *entry; @@ -377,7 +378,6 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton) it. */ mysql_mutex_lock(&LOCK_global_system_variables); - engine= NULL; for (auto_engines= opt_gtid_pos_auto_plugins; auto_engines && *auto_engines; ++auto_engines) @@ -422,6 +422,13 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton) table_name.str= loc_table_name.c_ptr_safe(); table_name.length= loc_table_name.length(); + thd= new THD(next_thread_id()); + thd->thread_stack= (char*) &thd; /* Set approximate stack start */ + thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; + thd->store_globals(); + thd->security_ctx->skip_grants(); + thd->set_command(COM_DAEMON); + thd->variables.wsrep_on= 0; err= gtid_pos_table_creation(thd, engine, &table_name); if (err) { @@ -449,15 +456,15 @@ handle_gtid_pos_auto_create_request(THD *thd, void *hton) mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); end: + delete thd; if (engine) plugin_unlock(NULL, engine); } -static bool slave_background_thread_running; -static bool slave_background_thread_stop; static bool slave_background_thread_gtid_loaded; +<<<<<<< HEAD static struct slave_background_kill_t { slave_background_kill_t *next; THD *to_kill; @@ -473,21 +480,31 @@ static volatile bool slave_background_gtid_pending_delete_flag; pthread_handler_t handle_slave_background(void *arg __attribute__((unused))) -{ - THD *thd; - PSI_stage_info old_stage; - bool stop; +||||||| 75538f94ca0 +static struct slave_background_kill_t { + slave_background_kill_t *next; + THD *to_kill; +} *slave_background_kill_list; - my_thread_init(); - thd= new THD(next_thread_id()); +static struct slave_background_gtid_pos_create_t { + slave_background_gtid_pos_create_t *next; + void *hton; +} *slave_background_gtid_pos_create_list; + + +pthread_handler_t +handle_slave_background(void *arg __attribute__((unused))) +======= +static void bg_rpl_load_gtid_slave_state(void *) +>>>>>>> bb-10.3-release +{ + THD *thd= new THD(next_thread_id()); thd->thread_stack= (char*) &thd; /* Set approximate stack start */ thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; thd->store_globals(); thd->security_ctx->skip_grants(); thd->set_command(COM_DAEMON); -#ifdef WITH_WSREP thd->variables.wsrep_on= 0; -#endif thd_proc_info(thd, "Loading slave GTID position from table"); if (rpl_load_gtid_slave_state(thd)) @@ -497,8 +514,10 @@ handle_slave_background(void *arg __attribute__((unused))) thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message()); - mysql_mutex_lock(&LOCK_slave_background); + // hijacking global_rpl_thread_pool cond here - it's only once on startup + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); slave_background_thread_gtid_loaded= true; +<<<<<<< HEAD mysql_cond_broadcast(&COND_slave_background); THD_STAGE_INFO(thd, stage_slave_background_process_request); @@ -569,35 +588,100 @@ handle_slave_background(void *arg __attribute__((unused))) mysql_cond_broadcast(&COND_slave_background); mysql_mutex_unlock(&LOCK_slave_background); +||||||| 75538f94ca0 + mysql_cond_broadcast(&COND_slave_background); + + THD_STAGE_INFO(thd, stage_slave_background_process_request); + do + { + slave_background_kill_t *kill_list; + slave_background_gtid_pos_create_t *create_list; + + thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background, + &stage_slave_background_wait_request, + &old_stage); + for (;;) + { + stop= abort_loop || thd->killed || slave_background_thread_stop; + kill_list= slave_background_kill_list; + create_list= slave_background_gtid_pos_create_list; + if (stop || kill_list || create_list) + break; + mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); + } + + slave_background_kill_list= NULL; + slave_background_gtid_pos_create_list= NULL; + thd->EXIT_COND(&old_stage); + + while (kill_list) + { + slave_background_kill_t *p = kill_list; + THD *to_kill= p->to_kill; + kill_list= p->next; + + to_kill->awake(KILL_CONNECTION); + mysql_mutex_lock(&to_kill->LOCK_wakeup_ready); + to_kill->rgi_slave->killed_for_retry= + rpl_group_info::RETRY_KILL_KILLED; + mysql_cond_broadcast(&to_kill->COND_wakeup_ready); + mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready); + my_free(p); + } + + while (create_list) + { + slave_background_gtid_pos_create_t *next= create_list->next; + void *hton= create_list->hton; + handle_gtid_pos_auto_create_request(thd, hton); + my_free(create_list); + create_list= next; + } + + mysql_mutex_lock(&LOCK_slave_background); + } while (!stop); + + slave_background_thread_running= false; + mysql_cond_broadcast(&COND_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); + +======= + mysql_cond_signal(&global_rpl_thread_pool.COND_rpl_thread_pool); + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); +>>>>>>> bb-10.3-release delete thd; +<<<<<<< HEAD my_thread_end(); return 0; -} +||||||| 75538f94ca0 + thread_safe_decrement32(&service_thread_count); + signal_thd_deleted(); + my_thread_end(); + return 0; +======= +>>>>>>> bb-10.3-release +} +static void bg_slave_kill(void *victim) +{ + THD *to_kill= (THD *)victim; + to_kill->awake(KILL_CONNECTION); + mysql_mutex_lock(&to_kill->LOCK_wakeup_ready); + to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED; + mysql_cond_broadcast(&to_kill->COND_wakeup_ready); + mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready); +} -void -slave_background_kill_request(THD *to_kill) +void slave_background_kill_request(THD *to_kill) { if (to_kill->rgi_slave->killed_for_retry) return; // Already deadlock killed. - slave_background_kill_t *p= - (slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME)); - if (p) - { - p->to_kill= to_kill; - to_kill->rgi_slave->killed_for_retry= - rpl_group_info::RETRY_KILL_PENDING; - mysql_mutex_lock(&LOCK_slave_background); - p->next= slave_background_kill_list; - slave_background_kill_list= p; - mysql_cond_signal(&COND_slave_background); - mysql_mutex_unlock(&LOCK_slave_background); - } + to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_PENDING; + mysql_manager_submit(bg_slave_kill, to_kill); } - /* This function must only be called from a slave SQL thread (or worker thread), to ensure that the table_entry will not go away before we can lock the @@ -607,23 +691,18 @@ void slave_background_gtid_pos_create_request( rpl_slave_state::gtid_pos_table *table_entry) { - slave_background_gtid_pos_create_t *p; - if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) return; - p= (slave_background_gtid_pos_create_t *)my_malloc(sizeof(*p), MYF(MY_WME)); - if (!p) - return; mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); if (table_entry->state != rpl_slave_state::GTID_POS_AUTO_CREATE) { - my_free(p); mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); return; } table_entry->state= rpl_slave_state::GTID_POS_CREATE_REQUESTED; mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); +<<<<<<< HEAD p->hton= table_entry->table_hton; mysql_mutex_lock(&LOCK_slave_background); p->next= slave_background_gtid_pos_create_list; @@ -698,6 +777,67 @@ stop_slave_background_thread() while (slave_background_thread_running) mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); mysql_mutex_unlock(&LOCK_slave_background); +||||||| 75538f94ca0 + p->hton= table_entry->table_hton; + mysql_mutex_lock(&LOCK_slave_background); + p->next= slave_background_gtid_pos_create_list; + slave_background_gtid_pos_create_list= p; + mysql_cond_signal(&COND_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); +} + + +/* + Start the slave background thread. + + This thread is currently used for two purposes: + + 1. To load the GTID state from mysql.gtid_slave_pos at server start; reading + from table requires valid THD, which is otherwise not available during + server init. + + 2. To kill worker thread transactions during parallel replication, when a + storage engine attempts to take an errorneous conflicting lock that would + cause a deadlock. Killing is done asynchroneously, as the kill may not + be safe within the context of a callback from inside storage engine + locking code. +*/ +static int +start_slave_background_thread() +{ + pthread_t th; + + slave_background_thread_running= true; + slave_background_thread_stop= false; + slave_background_thread_gtid_loaded= false; + if (mysql_thread_create(key_thread_slave_background, + &th, &connection_attrib, handle_slave_background, + NULL)) + { + sql_print_error("Failed to create thread while initialising slave"); + return 1; + } + mysql_mutex_lock(&LOCK_slave_background); + while (!slave_background_thread_gtid_loaded) + mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); + + return 0; +} + + +static void +stop_slave_background_thread() +{ + mysql_mutex_lock(&LOCK_slave_background); + slave_background_thread_stop= true; + mysql_cond_broadcast(&COND_slave_background); + while (slave_background_thread_running) + mysql_cond_wait(&COND_slave_background, &LOCK_slave_background); + mysql_mutex_unlock(&LOCK_slave_background); +======= + mysql_manager_submit(bg_gtid_pos_auto_create, table_entry->table_hton); +>>>>>>> bb-10.3-release } @@ -712,12 +852,19 @@ int init_slave() init_slave_psi_keys(); #endif - if (start_slave_background_thread()) - return 1; - if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) return 1; + slave_background_thread_gtid_loaded= false; + mysql_manager_submit(bg_rpl_load_gtid_slave_state, NULL); + + // hijacking global_rpl_thread_pool cond here - it's only once on startup + mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + while (!slave_background_thread_gtid_loaded) + mysql_cond_wait(&global_rpl_thread_pool.COND_rpl_thread_pool, + &global_rpl_thread_pool.LOCK_rpl_thread_pool); + mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); + /* This is called when mysqld starts. Before client connections are accepted. However bootstrap may conflict with us if it does START SLAVE. @@ -1443,10 +1590,15 @@ void slave_prepare_for_shutdown() mysql_mutex_lock(&LOCK_active_mi); master_info_index->free_connections(); mysql_mutex_unlock(&LOCK_active_mi); +<<<<<<< HEAD // It's safe to destruct worker pool now when // all driver threads are gone. global_rpl_thread_pool.deactivate(); stop_slave_background_thread(); +||||||| 75538f94ca0 + stop_slave_background_thread(); +======= +>>>>>>> bb-10.3-release } /* @@ -1477,8 +1629,6 @@ void end_slave() active_mi= 0; mysql_mutex_unlock(&LOCK_active_mi); - stop_slave_background_thread(); - global_rpl_thread_pool.destroy(); free_all_rpl_filters(); DBUG_VOID_RETURN; @@ -4737,10 +4887,7 @@ pthread_handler_t handle_slave_io(void *arg) goto err; } - -#ifdef WITH_WSREP thd->variables.wsrep_on= 0; -#endif if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0) || repl_semisync_slave.slave_start(mi)) { @@ -5060,8 +5207,11 @@ log space"); err: // print the current replication position if (mi->using_gtid == Master_info::USE_GTID_NO) + { sql_print_information("Slave I/O thread exiting, read up to log '%s', " "position %llu", IO_RPL_LOG_NAME, mi->master_log_pos); + sql_print_information("master was %s:%d", mi->host, mi->port); + } else { StringBuffer<100> tmp; @@ -5070,6 +5220,7 @@ err: "position %llu; GTID position %s", IO_RPL_LOG_NAME, mi->master_log_pos, tmp.c_ptr_safe()); + sql_print_information("master was %s:%d", mi->host, mi->port); } repl_semisync_slave.slave_stop(mi); thd->reset_query(); @@ -5672,6 +5823,7 @@ pthread_handler_t handle_slave_sql(void *arg) sql_print_information("Slave SQL thread exiting, replication stopped in " "log '%s' at position %llu%s", RPL_LOG_NAME, rli->group_master_log_pos, tmp.c_ptr_safe()); + sql_print_information("master was %s:%d", mi->host, mi->port); } #ifdef WITH_WSREP wsrep_after_command_before_result(thd); |