diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/sql_class.h | 2 | ||||
-rw-r--r-- | sql/sql_parse.cc | 4 | ||||
-rw-r--r-- | sql/sql_plugin_services.ic | 4 | ||||
-rw-r--r-- | sql/sql_show.h | 2 | ||||
-rw-r--r-- | sql/wsrep_dummy.cc | 8 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 8 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 94 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 22 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 376 | ||||
-rw-r--r-- | sql/wsrep_thd.h | 1 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 1 |
11 files changed, 492 insertions, 30 deletions
diff --git a/sql/sql_class.h b/sql/sql_class.h index 5ced820a34d..2fe715d0c4a 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -4712,6 +4712,8 @@ public: #ifdef WITH_WSREP const bool wsrep_applier; /* dedicated slave applier thread */ + bool wsrep_killer; /* dedicated background + kill thread */ bool wsrep_applier_closing; /* applier marked to close */ bool wsrep_client_thread; /* to identify client threads*/ bool wsrep_PA_safe; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 6802816caaf..4d05201d9f0 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -8920,7 +8920,7 @@ void add_join_natural(TABLE_LIST *a, TABLE_LIST *b, List<String> *using_fields, pointer - thread found, and its LOCK_thd_kill is locked. */ -THD *find_thread_by_id(longlong id, bool query_id) +THD *find_thread_by_id(longlong id, bool query_id, bool lock_thd_data) { THD *tmp; mysql_mutex_lock(&LOCK_thread_count); // For unlink from list @@ -8931,6 +8931,8 @@ THD *find_thread_by_id(longlong id, bool query_id) continue; if (id == (query_id ? tmp->query_id : (longlong) tmp->thread_id)) { + // Lock from concurrent data change if requested + if (lock_thd_data) mysql_mutex_lock(&tmp->LOCK_thd_data); mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete break; } diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic index 7c394b57c3e..64e0d2cb9f3 100644 --- a/sql/sql_plugin_services.ic +++ b/sql/sql_plugin_services.ic @@ -163,6 +163,7 @@ static struct wsrep_service_st wsrep_handler = { wsrep_run_wsrep_commit, wsrep_thd_LOCK, wsrep_thd_UNLOCK, + wsrep_LOCK, wsrep_thd_awake, wsrep_thd_conflict_state, wsrep_thd_conflict_state_str, @@ -186,7 +187,8 @@ static struct wsrep_service_st wsrep_handler = { wsrep_trx_order_before, wsrep_unlock_rollback, wsrep_set_data_home_dir, - wsrep_thd_is_applier + wsrep_thd_is_applier, + wsrep_enqueue_background_kill }; static struct thd_specifics_service_st thd_specifics_handler= diff --git a/sql/sql_show.h b/sql/sql_show.h index 39cbc35230a..804f44a7ffe 100644 --- a/sql/sql_show.h +++ b/sql/sql_show.h @@ -143,7 +143,7 @@ const char* get_one_variable(THD *thd, const SHOW_VAR *variable, /* These functions were under INNODB_COMPATIBILITY_HOOKS */ int get_quote_char_for_identifier(THD *thd, const char *name, size_t length); -THD *find_thread_by_id(longlong id, bool query_id= false); +THD *find_thread_by_id(longlong id, bool query_id= false, bool lock_thd_data= false); class select_result_explain_buffer; /* diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc index 1af74035355..feb86987b3d 100644 --- a/sql/wsrep_dummy.cc +++ b/sql/wsrep_dummy.cc @@ -16,6 +16,7 @@ #include "mariadb.h" #include <sql_class.h> #include <mysql/service_wsrep.h> +#include "wsrep_mysqld.h" my_bool wsrep_thd_is_BF(THD *, my_bool) { return 0; } @@ -163,3 +164,10 @@ void wsrep_log(void (*)(const char *, ...), const char *, ...) my_bool wsrep_thd_is_applier(MYSQL_THD thd) { return false; } + +bool wsrep_enqueue_background_kill(wsrep_kill_t item) +{ return false;} + +void wsrep_LOCK(THD *) +{ } + diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index 05be257cbcb..1ad704aec1a 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -142,10 +142,10 @@ void wsrep_post_commit(THD* thd, bool all) => cleanup */ if (thd->wsrep_conflict_state != MUST_REPLAY) - { - WSREP_DEBUG("cleanup transaction for LOCAL_STATE: %s", - WSREP_QUERY(thd)); - } + { + WSREP_DEBUG("cleanup transaction for LOCAL_STATE: %s", + wsrep_thd_query(thd)); + } /* Run post-rollback hook to clean up in the case if some keys were populated for the transaction in provider diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index cfba0ace2cb..df2cbc3f4d3 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -37,6 +37,8 @@ #include <cstdio> #include <cstdlib> #include "log_event.h" +#include <list> +#include <algorithm> wsrep_t *wsrep = NULL; /* @@ -131,6 +133,8 @@ mysql_cond_t COND_wsrep_replaying; mysql_mutex_t LOCK_wsrep_slave_threads; mysql_mutex_t LOCK_wsrep_desync; mysql_mutex_t LOCK_wsrep_config_state; +mysql_mutex_t LOCK_wsrep_kill; +mysql_cond_t COND_wsrep_kill; int wsrep_replaying= 0; ulong wsrep_running_threads = 0; // # of currently running wsrep @@ -138,6 +142,7 @@ ulong wsrep_running_threads = 0; // # of currently running wsrep ulong wsrep_running_applier_threads = 0; // # of running applier threads ulong wsrep_running_rollbacker_threads = 0; // # of running // # rollbacker threads +ulong wsrep_running_killer_threads = 0; ulong my_bind_addr; #ifdef HAVE_PSI_INTERFACE @@ -145,11 +150,13 @@ PSI_mutex_key key_LOCK_wsrep_rollback, key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init, key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync, - key_LOCK_wsrep_config_state; + key_LOCK_wsrep_config_state, + key_LOCK_wsrep_kill; PSI_cond_key key_COND_wsrep_rollback, key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, - key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread; + key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread, + key_COND_wsrep_kill; PSI_file_key key_file_wsrep_gra_log; @@ -164,7 +171,8 @@ static PSI_mutex_info wsrep_mutexes[]= { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL}, - { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL} + { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_kill, "LOCK_wsrep_kill", PSI_FLAG_GLOBAL} }; static PSI_cond_info wsrep_conds[]= @@ -174,7 +182,8 @@ static PSI_cond_info wsrep_conds[]= { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0}, { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL}, - { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL} + { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_kill, "COND_wsrep_kill", PSI_FLAG_GLOBAL} }; static PSI_file_info wsrep_files[]= @@ -183,14 +192,15 @@ static PSI_file_info wsrep_files[]= }; PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor, - key_wsrep_rollbacker, key_wsrep_applier; + key_wsrep_rollbacker, key_wsrep_applier, key_wsrep_killer; static PSI_thread_info wsrep_threads[]= { {&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL}, {&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL}, {&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL}, - {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL} + {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}, + {&key_wsrep_killer, "wsrep_killer_thread", PSI_FLAG_GLOBAL} }; #endif /* HAVE_PSI_INTERFACE */ @@ -237,6 +247,7 @@ wsp::Config_state *wsrep_config_state; // if there was no state gap on receiving first view event. static my_bool wsrep_startup = TRUE; +std::list< wsrep_kill_t > wsrep_kill_list; static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { switch (level) { @@ -829,6 +840,8 @@ void wsrep_thr_init() mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_kill, &LOCK_wsrep_kill, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_kill, &COND_wsrep_kill, NULL); DBUG_VOID_RETURN; } @@ -865,6 +878,7 @@ void wsrep_init_startup (bool first) if (!wsrep_start_replication()) unireg_abort(1); wsrep_create_rollbacker(); + wsrep_create_killer(); wsrep_create_appliers(1); if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed @@ -906,6 +920,8 @@ void wsrep_thr_deinit() mysql_mutex_destroy(&LOCK_wsrep_slave_threads); mysql_mutex_destroy(&LOCK_wsrep_desync); mysql_mutex_destroy(&LOCK_wsrep_config_state); + mysql_mutex_destroy(&LOCK_wsrep_kill); + mysql_cond_destroy(&COND_wsrep_kill); delete wsrep_config_state; wsrep_config_state= 0; // Safety } @@ -1657,7 +1673,7 @@ static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_, if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false) { - WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd)); + WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd)); return 1; } @@ -2147,9 +2163,13 @@ pthread_handler_t start_wsrep_THD(void *arg) case WSREP_ROLLBACKER_THREAD: wsrep_running_rollbacker_threads++; break; + case WSREP_KILLER_THREAD: + wsrep_running_killer_threads++; + thd->wsrep_killer= true; + break; default: WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type); - break; + assert(0); } mysql_cond_broadcast(&COND_thread_count); @@ -2172,9 +2192,13 @@ pthread_handler_t start_wsrep_THD(void *arg) DBUG_ASSERT(wsrep_running_rollbacker_threads > 0); wsrep_running_rollbacker_threads--; break; + case WSREP_KILLER_THREAD: + DBUG_ASSERT(wsrep_running_killer_threads > 0); + wsrep_running_killer_threads--; + break; default: WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type); - break; + assert(0); } my_free(args); @@ -2416,7 +2440,11 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd) } DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); - WSREP_DEBUG("waiting for client connections to close: %u", thread_count); + WSREP_DEBUG("Waiting for client connections to close: %u", thread_count); + WSREP_DEBUG("Waiting for rollbacker threads to close: %lu", wsrep_running_rollbacker_threads); + WSREP_DEBUG("Waiting for applier threads to close: %lu", wsrep_running_applier_threads); + WSREP_DEBUG("Waiting for killer threads to close: %lu", wsrep_running_killer_threads); + WSREP_DEBUG("Waiting for wsrep threads to close: %lu", wsrep_running_threads); while (wait_to_end && have_client_connections()) { @@ -2450,7 +2478,7 @@ void wsrep_close_threads(THD *thd) DBUG_PRINT("quit",("Informing thread %lld that it's time to die", (longlong) tmp->thread_id)); /* We skip slave threads & scheduler on this first loop through. */ - if (tmp->wsrep_applier && tmp != thd) + if ((tmp->wsrep_applier || tmp->wsrep_killer) && tmp != thd) { WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id); wsrep_close_thread (tmp); @@ -2464,7 +2492,7 @@ void wsrep_wait_appliers_close(THD *thd) { /* Wait for wsrep appliers to gracefully exit */ mysql_mutex_lock(&LOCK_thread_count); - while (wsrep_running_threads > 1) + while (wsrep_running_threads > 2) // 1 is for rollbacker thread which needs to be killed explicitly. // This gotta be fixed in a more elegant manner if we gonna have arbitrary // number of non-applier wsrep threads. @@ -2738,9 +2766,12 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) { + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + mysql_mutex_assert_owner(&thd->LOCK_thd_kill); + if (signal) { - thd->awake(KILL_QUERY); + thd->awake_no_mutex(KILL_QUERY); } else { @@ -2748,6 +2779,9 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) mysql_cond_broadcast(&COND_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying); } + + mysql_mutex_unlock(&thd->LOCK_thd_kill); + mysql_mutex_unlock(&thd->LOCK_thd_data); } @@ -3001,3 +3035,37 @@ bool wsrep_node_is_synced() { return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false; } + +bool wsrep_enqueue_background_kill(wsrep_kill_t item) +{ + std::list< wsrep_kill_t >::iterator it; + bool inserted= false; + + mysql_mutex_lock(&LOCK_wsrep_kill); + + for (it = wsrep_kill_list.begin(); it != wsrep_kill_list.end(); it++) + { + if ((*it).victim_thd_id == item.victim_thd_id) + break; + } + + if(it != wsrep_kill_list.end()) + { + WSREP_DEBUG("Thread: %lu already on kill list", item.victim_thd_id); + } + else + { + wsrep_kill_list.push_back(item); + mysql_cond_signal(&COND_wsrep_kill); + inserted= true; + } + + mysql_mutex_unlock(&LOCK_wsrep_kill); + return inserted; +} + +void wsrep_LOCK(THD* thd) +{ + mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_kill); +} diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 55ea032e835..6353e936f01 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -18,9 +18,20 @@ #ifndef WSREP_MYSQLD_H #define WSREP_MYSQLD_H +#include <my_config.h> +#include <stdint.h> #include <mysql/plugin.h> #include <mysql/service_wsrep.h> +typedef struct wsrep_kill { + unsigned long victim_thd_id; + unsigned long bf_thd_id; + uint64_t victim_trx_id; + uint64_t bf_trx_id; + bool signal; + bool wait_lock; +} wsrep_kill_t; + #ifdef WITH_WSREP typedef struct st_mysql_show_var SHOW_VAR; @@ -92,6 +103,7 @@ extern my_bool wsrep_slave_UK_checks; extern ulong wsrep_running_threads; extern ulong wsrep_running_applier_threads; extern ulong wsrep_running_rollbacker_threads; +extern ulong wsrep_running_killer_threads; extern bool wsrep_new_cluster; extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; @@ -223,8 +235,6 @@ void wsrep_log(void (*fun)(const char *, ...), const char *format, ...); #define WSREP_PROVIDER_EXISTS \ (wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN)) -#define WSREP_QUERY(thd) (thd->query()) - extern my_bool wsrep_ready_get(); extern void wsrep_ready_wait(); @@ -254,6 +264,8 @@ extern mysql_cond_t COND_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_mutex_t LOCK_wsrep_desync; extern mysql_mutex_t LOCK_wsrep_config_state; +extern mysql_mutex_t LOCK_wsrep_kill; +extern mysql_cond_t COND_wsrep_kill; extern wsrep_aborting_thd_t wsrep_aborting_thd; extern my_bool wsrep_emulate_bin_log; extern int wsrep_to_isolation; @@ -278,6 +290,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying; extern PSI_cond_key key_COND_wsrep_replaying; extern PSI_mutex_key key_LOCK_wsrep_slave_threads; extern PSI_mutex_key key_LOCK_wsrep_desync; +extern PSI_mutex_key key_LOCK_wsrep_kill; +extern PSI_cond_key key_COND_wsrep_kill; extern PSI_file_key key_file_wsrep_gra_log; @@ -285,6 +299,7 @@ extern PSI_thread_key key_wsrep_sst_joiner; extern PSI_thread_key key_wsrep_sst_donor; extern PSI_thread_key key_wsrep_rollbacker; extern PSI_thread_key key_wsrep_applier; +extern PSI_thread_key key_wsrep_killer; #endif /* HAVE_PSI_INTERFACE */ @@ -311,7 +326,8 @@ void thd_binlog_trx_reset(THD * thd); enum wsrep_thread_type { WSREP_APPLIER_THREAD=1, - WSREP_ROLLBACKER_THREAD=2 + WSREP_ROLLBACKER_THREAD=2, + WSREP_KILLER_THREAD=3 }; typedef void (*wsrep_thd_processor_fun)(THD *); diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 9515fd550f2..77bd32a97d2 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -22,11 +22,16 @@ //#include "global_threads.h" // LOCK_thread_count, etc. #include "sql_base.h" // close_thread_tables() #include "mysqld.h" // start_wsrep_THD(); +#include "sql_show.h" // find_thread_by_id #include "slave.h" // opt_log_slave_updates #include "rpl_filter.h" #include "rpl_rli.h" #include "rpl_mi.h" +#include "debug_sync.h" +#include <list> + +extern std::list< wsrep_kill_t > wsrep_kill_list; #if (__LP64__) static volatile int64 wsrep_bf_aborts_counter(0); @@ -280,7 +285,7 @@ void wsrep_replay_sp_transaction(THD* thd) WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s", rcode, (thd->db.str ? thd->db.str : "(null)"), - WSREP_QUERY(thd)); + wsrep_thd_query(thd)); /* we're now in inconsistent state, must abort */ mysql_mutex_unlock(&thd->LOCK_thd_data); unireg_abort(1); @@ -528,12 +533,27 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock) mysql_mutex_lock(&LOCK_thread_count); ulong old_wsrep_running_threads= wsrep_running_threads; + PSI_thread_key key; - DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD || - args->thread_type == WSREP_ROLLBACKER_THREAD); +#ifdef HAVE_PSI_INTERFACE + switch(args->thread_type) + { + case WSREP_APPLIER_THREAD: + key = key_wsrep_applier; + break; + case WSREP_ROLLBACKER_THREAD: + key = key_wsrep_rollbacker; + break; + case WSREP_KILLER_THREAD: + key = key_wsrep_killer; + break; + default: + WSREP_ERROR("Incorrect thread type %d", args->thread_type); + assert(0); + } +#endif /* HAVE_PSI_INTERFACE */ - bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD - ? key_wsrep_applier : key_wsrep_rollbacker, + bool res= mysql_thread_create(key, &args->thread_id, &connection_attrib, start_wsrep_THD, (void*)args); @@ -561,6 +581,348 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock) return res; } +static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno) +{ + WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " + "caused by:\n\t" + "1) unsupported configuration options combination, please check documentation.\n\t" + "2) a bug in the code.\n\t" + "3) a database corruption.\n Node consistency compromized, " + "need to abort. Restart the node to resync with cluster.", + bf_seqno, victim_seqno); + abort(); +} + +static int wsrep_kill(wsrep_kill_t* item) +{ + bool signal= item->signal; + unsigned long long victim_trx_id= static_cast<unsigned long long>(item->victim_trx_id); + unsigned long long bf_trx_id= static_cast<unsigned long long>(item->bf_trx_id); + + // Note that find_thread_by_id will acquire LOCK_thd_kill mutex + // for thd if it's found + THD* bf_thd= find_thread_by_id(item->bf_thd_id, false, false); + + if (!bf_thd) + { + WSREP_ERROR("BF thread: %lu not found", item->bf_thd_id); + assert(0); + } + + long long bf_seqno= wsrep_thd_trx_seqno(bf_thd); + + WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s query: %s", + wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal", + bf_trx_id, + item->bf_thd_id, + bf_seqno, + wsrep_thd_query_state_str(bf_thd), + wsrep_thd_conflict_state_str(bf_thd), + wsrep_thd_query(bf_thd)); + + // Note that we need to release LOCK_thd_kill mutex from BF thread + // to obey safe mutex ordering of LOCK_thread_count -> LOCK_thd_data + // that both are taken on find_thread_by_id below + mysql_mutex_unlock(&bf_thd->LOCK_thd_kill); + + THD* thd= find_thread_by_id(item->victim_thd_id, false, true); + + if (!thd) + { + WSREP_DEBUG("Victim thread: %lu not found", item->victim_thd_id); + return(0); + } + + WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); + + unsigned long victim_thread= item->victim_thd_id; + long long victim_seqno= wsrep_thd_trx_seqno(thd); + + WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s query: %s", + wsrep_thd_is_BF(thd, false) ? "BF" : "normal", + victim_trx_id, + victim_thread, + victim_seqno, + wsrep_thd_query_state_str(thd), + wsrep_thd_conflict_state_str(thd), + wsrep_thd_query(thd)); + + if (wsrep_thd_query_state(thd) == QUERY_EXITING) + { + WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu" + " thread: %lu", + victim_trx_id, + victim_thread); + mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); + return(0); + } + + if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) + { + WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu " + ", thread: %lu exec_mode: %s", + victim_trx_id, + victim_thread, + wsrep_thd_exec_mode_str(thd)); + } + + switch (wsrep_thd_get_conflict_state(thd)) + { + case NO_CONFLICT: + WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state", + victim_thread, + victim_trx_id); + wsrep_thd_set_conflict_state(thd, MUST_ABORT); + break; + case MUST_ABORT: + WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state", + victim_thread, + victim_trx_id); + wsrep_thd_awake(thd, signal); + return(0); + break; + case ABORTED: + case ABORTING: // fall through + default: + WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_conflict_state_str(thd)); + mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); + return(0); + break; + } + + switch (wsrep_thd_query_state(thd)) + { + case QUERY_COMMITTING: + { + enum wsrep_status rcode=WSREP_OK; + + WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + } + else + { + wsrep_t *wsrep= get_wsrep(); + + rcode= wsrep->abort_pre_commit(wsrep, bf_seqno, + (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id); + + switch (rcode) + { + case WSREP_WARNING: + { + WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + return(1); + break; + } + case WSREP_OK: + break; + default: + { + WSREP_ERROR("Victim cancel commit bad commit exit thread: " + "%lu trx: %llu rcode: %d ", + victim_thread, + victim_trx_id, + rcode); + /* unable to interrupt, must abort */ + /* note: kill_mysql() will block, if we cannot. + * kill the lock holder first. */ + abort(); + break; + } + } + } + + wsrep_thd_awake(thd, signal); + break; + } + case QUERY_EXEC: + { + /* it is possible that victim trx is itself waiting for some + * other lock. We need to cancel this waiting */ + WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu", + victim_thread, victim_trx_id); + + bool wait_lock= item->wait_lock; + + if (wait_lock) + { + WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + } + else + { + /* Abort currently executing query */ + WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + + /* for BF thd, we need to prevent him from committing */ + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave for thread: " + "%lu trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + } + } + break; + } + case QUERY_IDLE: + { + WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: " + "%llu bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_thd_UNLOCK(thd); + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + return(0); + } + + /* This will lock thd from proceeding after net_read() */ + wsrep_thd_set_conflict_state(thd, ABORTING); + + wsrep_lock_rollback(); + + if (wsrep_aborting_thd_contains(thd)) + { + WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + } + else + { + wsrep_aborting_thd_enqueue(thd); + WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort", + victim_thread, + victim_trx_id); + } + + wsrep_unlock_rollback(); + mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); + + break; + } + default: + { + WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_query_state_str(thd)); + + assert(0); + break; + } + } + + return(0); +} + +static void wsrep_process_kill(THD *thd) +{ + DBUG_ENTER("wsrep_process_kill"); + + mysql_mutex_lock(&LOCK_wsrep_kill); + + WSREP_DEBUG("WSREP killer thread started"); + + while (thd->killed == NOT_KILLED) + { + thd_proc_info(thd, "wsrep killer idle"); + thd->mysys_var->current_mutex= &LOCK_wsrep_kill; + thd->mysys_var->current_cond= &COND_wsrep_kill; + + mysql_cond_wait(&COND_wsrep_kill,&LOCK_wsrep_kill); + + WSREP_DEBUG("WSREP killer thread wakes for signal"); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd_proc_info(thd, "wsrep killer active"); + thd->mysys_var->current_mutex= 0; + thd->mysys_var->current_cond= 0; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + /* process all entries in the queue */ + while (!wsrep_kill_list.empty()) + { + wsrep_kill_t to_be_killed= wsrep_kill_list.front(); + // Release list mutex while we kill one thread + mysql_mutex_unlock(&LOCK_wsrep_kill); + wsrep_kill(&to_be_killed); + mysql_mutex_lock(&LOCK_wsrep_kill); + wsrep_kill_list.pop_front(); + } + } + + assert(wsrep_kill_list.empty()); + + mysql_mutex_unlock(&LOCK_wsrep_kill); + sql_print_information("WSREP: killer thread exiting"); + DBUG_PRINT("wsrep",("wsrep killer thread exiting")); + DBUG_VOID_RETURN; +} + + +void wsrep_create_killer() +{ + wsrep_thread_args* arg; + if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) { + WSREP_ERROR("Can't allocate memory for wsrep background killer thread"); + assert(0); + } + + arg->thread_type = WSREP_KILLER_THREAD; + arg->processor = wsrep_process_kill; + + if (create_wsrep_THD(arg, false)) { + WSREP_WARN("Can't create thread to manage wsrep background kill"); + my_free(arg); + return; + } +} + bool wsrep_create_appliers(long threads, bool thread_count_lock) { if (!wsrep_connected) @@ -656,7 +1018,7 @@ static void wsrep_rollback_process(THD *thd) mysql_mutex_unlock(&aborting->LOCK_thd_data); - set_current_thd(aborting); + set_current_thd(aborting); aborting->store_globals(); mysql_mutex_lock(&aborting->LOCK_thd_data); @@ -666,7 +1028,7 @@ static void wsrep_rollback_process(THD *thd) (longlong) aborting->real_id); mysql_mutex_unlock(&aborting->LOCK_thd_data); - set_current_thd(thd); + set_current_thd(thd); thd->store_globals(); mysql_mutex_lock(&LOCK_wsrep_rollback); diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h index 10efcbefbf6..51700d31e63 100644 --- a/sql/wsrep_thd.h +++ b/sql/wsrep_thd.h @@ -29,6 +29,7 @@ void wsrep_replay_sp_transaction(THD* thd); void wsrep_replay_transaction(THD *thd); bool wsrep_create_appliers(long threads, bool thread_count_lock=false); void wsrep_create_rollbacker(); +void wsrep_create_killer(); int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal); diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index f18dc565329..be3a55557e7 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -500,6 +500,7 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type) if (wsrep_start_replication()) { wsrep_create_rollbacker(); + wsrep_create_killer(); WSREP_DEBUG("Cluster address update creating %ld applier threads running %lu", wsrep_slave_threads, wsrep_running_applier_threads); wsrep_create_appliers(wsrep_slave_threads); |