diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 2 | ||||
-rw-r--r-- | sql/mysqld.cc | 9 | ||||
-rw-r--r-- | sql/mysqld.h | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 32 | ||||
-rw-r--r-- | sql/sql_class.h | 11 | ||||
-rw-r--r-- | sql/sql_parse.cc | 34 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 134 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 9 |
8 files changed, 182 insertions, 51 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index c0a810a72bc..8dc5fb20ba3 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -835,11 +835,9 @@ static my_bool kill_handlerton(THD *thd, plugin_ref plugin, { handlerton *hton= plugin_hton(plugin); - mysql_mutex_lock(&thd->LOCK_thd_data); if (hton->state == SHOW_OPTION_YES && hton->kill_query && thd_get_ha_data(thd, hton)) hton->kill_query(hton, thd, *(enum thd_kill_levels *) level); - mysql_mutex_unlock(&thd->LOCK_thd_data); return FALSE; } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index f53bb9f7451..e8749556084 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -2412,7 +2412,7 @@ static void clean_up_mutexes() static void set_ports() { } -void close_connection(THD *thd, uint sql_errno) +void close_connection(THD *thd, uint sql_errno, my_bool locked) { } #else @@ -2889,7 +2889,7 @@ static void network_init(void) For the connection that is doing shutdown, this is called twice */ -void close_connection(THD *thd, uint sql_errno) +void close_connection(THD *thd, uint sql_errno, my_bool locked) { DBUG_ENTER("close_connection"); @@ -2899,7 +2899,10 @@ void close_connection(THD *thd, uint sql_errno) thd->print_aborted_warning(3, sql_errno ? ER_DEFAULT(sql_errno) : "CLOSE_CONNECTION"); - thd->disconnect(); + if (locked) + thd->disconnect_mutexed(); + else + thd->disconnect(); MYSQL_CONNECTION_DONE((int) sql_errno, thd->thread_id); diff --git a/sql/mysqld.h b/sql/mysqld.h index d40e1d170d0..3d3ed34f4d5 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -82,7 +82,7 @@ enum enum_slave_parallel_mode { /* Function prototypes */ void kill_mysql(THD *thd= 0); -void close_connection(THD *thd, uint sql_errno= 0); +void close_connection(THD *thd, uint sql_errno= 0, my_bool locked=false); void handle_connection_in_main_thread(CONNECT *thd); void create_thread_to_handle_connection(CONNECT *connect); void signal_thd_deleted(); diff --git a/sql/sql_class.cc b/sql/sql_class.cc index d3c090c3308..49e0cbcf383 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1918,13 +1918,13 @@ void THD::awake_no_mutex(killed_state state_to_set) the Vio might be disassociated concurrently. */ -void THD::disconnect() +void THD::disconnect_mutexed() { Vio *vio= NULL; - set_killed(KILL_CONNECTION); - - mysql_mutex_lock(&LOCK_thd_data); + mysql_mutex_assert_owner(&LOCK_thd_data); + mysql_mutex_assert_owner(&LOCK_thd_kill); + set_killed_no_mutex(KILL_CONNECTION); #ifdef SIGNAL_WITH_VIO_CLOSE /* @@ -1940,8 +1940,6 @@ void THD::disconnect() if (net.vio != vio) vio_close(net.vio); net.thd= 0; // Don't collect statistics - - mysql_mutex_unlock(&LOCK_thd_data); } @@ -1977,7 +1975,10 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, if (needs_thr_lock_abort) { + /* Protect thread from concurrent usage */ mysql_mutex_lock(&in_use->LOCK_thd_data); + /* Protect thread from concurrent disconnect and delete */ + mysql_mutex_lock(&in_use->LOCK_thd_kill); /* If not already dying */ if (in_use->killed != KILL_CONNECTION_HARD) { @@ -1993,17 +1994,20 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, thread can see those instances (e.g. see partitioning code). */ if (!thd_table->needs_reopen()) - { signalled|= mysql_lock_abort_for_thread(this, thd_table); - if (WSREP(this) && wsrep_thd_is_BF(this, FALSE)) - { - WSREP_DEBUG("remove_table_from_cache: %llu", - (unsigned long long) this->real_id); - wsrep_abort_thd((void *)this, (void *)in_use, FALSE); - } - } } +#ifdef WITH_WSREP + if (WSREP(this) && wsrep_thd_is_BF(this, false)) + { + WSREP_DEBUG("notify_shared_lock: BF thread %llu query %s" + " victim %llu query %s", + this->real_id, wsrep_thd_query(this), + in_use->real_id, wsrep_thd_query(in_use)); + wsrep_abort_thd((void *)this, (void *)in_use, false); + } +#endif /* WITH_WSREP */ } + mysql_mutex_unlock(&in_use->LOCK_thd_kill); mysql_mutex_unlock(&in_use->LOCK_thd_data); } DBUG_RETURN(signalled); diff --git a/sql/sql_class.h b/sql/sql_class.h index 1df9a9dc718..c358d938f21 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3334,8 +3334,15 @@ public: } /** Disconnect the associated communication endpoint. */ - void disconnect(); - + inline void disconnect() + { + mysql_mutex_lock(&LOCK_thd_data); + mysql_mutex_lock(&LOCK_thd_kill); + disconnect_mutexed(); + mysql_mutex_unlock(&LOCK_thd_kill); + mysql_mutex_unlock(&LOCK_thd_data); + } + void disconnect_mutexed(); /* Allows this thread to serve as a target for others to schedule Async diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 95c04321b4d..aa0b4a96327 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -9157,6 +9157,18 @@ static void sql_kill(THD *thd, longlong id, killed_state state, killed_type type) { uint error; +#ifdef WITH_WSREP + if (WSREP(thd)) + { + WSREP_DEBUG("sql_kill called"); + if (thd->wsrep_applier) + { + WSREP_DEBUG("KILL in applying, bailing out here"); + return; + } + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) + } +#endif /* WITH_WSREP */ if (likely(!(error= kill_one_thread(thd, id, state, type)))) { if (!thd->killed) @@ -9166,6 +9178,11 @@ void sql_kill(THD *thd, longlong id, killed_state state, killed_type type) } else my_error(error, MYF(0), id); +#ifdef WITH_WSREP + return; + wsrep_error_label: + my_error(ER_CANNOT_USER, MYF(0), wsrep_thd_query(thd)); +#endif /* WITH_WSREP */ } @@ -9174,6 +9191,18 @@ void sql_kill_user(THD *thd, LEX_USER *user, killed_state state) { uint error; ha_rows rows; +#ifdef WITH_WSREP + if (WSREP(thd)) + { + WSREP_DEBUG("sql_kill_user called"); + if (thd->wsrep_applier) + { + WSREP_DEBUG("KILL in applying, bailing out here"); + return; + } + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) + } +#endif /* WITH_WSREP */ if (likely(!(error= kill_threads_for_user(thd, user, state, &rows)))) my_ok(thd, rows); else @@ -9184,6 +9213,11 @@ void sql_kill_user(THD *thd, LEX_USER *user, killed_state state) */ my_error(error, MYF(0), user->host.str, user->user.str); } +#ifdef WITH_WSREP + return; + wsrep_error_label: + my_error(ER_CANNOT_USER, MYF(0), user->user.str); +#endif /* WITH_WSREP */ } diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 9f152d2a20c..71376e5a987 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -1,4 +1,4 @@ -/* Copyright 2008-2015 Codership Oy <http://www.codership.com> +/* Copyright 2008-2021 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -833,13 +833,28 @@ void wsrep_thr_init() DBUG_VOID_RETURN; } +/* This is wrapper for wsrep_break_lock in thr_lock.c */ +static int wsrep_thr_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) +{ + THD* victim_thd= (THD *) victim_thd_ptr; + /* We need to lock THD::LOCK_thd_data to protect victim + from concurrent usage and THD::LOCK_thd_kill to protect + from disconnect or delete. */ + mysql_mutex_lock(&victim_thd->LOCK_thd_data); + mysql_mutex_lock(&victim_thd->LOCK_thd_kill); + int res= wsrep_abort_thd(bf_thd_ptr, victim_thd_ptr, signal); + mysql_mutex_unlock(&victim_thd->LOCK_thd_data); + mysql_mutex_unlock(&victim_thd->LOCK_thd_kill); + return res; +} + void wsrep_init_startup (bool first) { if (wsrep_init()) unireg_abort(1); wsrep_thr_lock_init( (wsrep_thd_is_brute_force_fun)wsrep_thd_is_BF, - (wsrep_abort_thd_fun)wsrep_abort_thd, + (wsrep_abort_thd_fun)wsrep_thr_abort_thd, wsrep_debug, wsrep_convert_LOCK_to_trx, (wsrep_on_fun)wsrep_on); @@ -1685,6 +1700,11 @@ static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_, case SQLCOM_DROP_TABLE: buf_err= wsrep_drop_table_query(thd, &buf, &buf_len); break; + case SQLCOM_KILL: + WSREP_DEBUG("KILL as TOI: %s", thd->query()); + buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), + &buf, &buf_len); + break; case SQLCOM_CREATE_ROLE: if (sp_process_definer(thd)) { @@ -2006,6 +2026,8 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, ticket->wsrep_report(wsrep_debug); mysql_mutex_lock(&granted_thd->LOCK_thd_data); + mysql_mutex_lock(&granted_thd->LOCK_thd_kill); + if (granted_thd->wsrep_exec_mode == TOTAL_ORDER || granted_thd->wsrep_exec_mode == REPL_RECV) { @@ -2013,6 +2035,7 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, request_thd, granted_thd); ticket->wsrep_report(true); mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); ret= true; } else if (granted_thd->lex->sql_command == SQLCOM_FLUSH || @@ -2021,6 +2044,7 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, WSREP_DEBUG("BF thread waiting for FLUSH"); ticket->wsrep_report(wsrep_debug); mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); ret= false; } else @@ -2045,15 +2069,17 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, ticket->wsrep_report(true); } - mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + /* This will call wsrep_abort_transaction so we should hold + THD::LOCK_thd_data to protect victim from concurrent usage + and THD::LOCK_thd_kill to protect from disconnect or delete. */ wsrep_abort_thd((void *) request_thd, (void *) granted_thd, 1); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); ret= false; } } else - { mysql_mutex_unlock(&request_thd->LOCK_thd_data); - } return ret; } @@ -2221,6 +2247,8 @@ error: static bool abort_replicated(THD *thd) { bool ret_code= false; + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + mysql_mutex_assert_owner(&thd->LOCK_thd_kill); if (thd->wsrep_query_state== QUERY_COMMITTING) { WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id)); @@ -2235,6 +2263,7 @@ static bool abort_replicated(THD *thd) /**/ static inline bool is_client_connection(THD *thd) { + mysql_mutex_assert_owner(&thd->LOCK_thd_data); return (thd->wsrep_client_thread && thd->variables.wsrep_on); } @@ -2243,9 +2272,8 @@ static inline bool is_replaying_connection(THD *thd) { bool ret; - mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_assert_owner(&thd->LOCK_thd_data); ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false; - mysql_mutex_unlock(&thd->LOCK_thd_data); return ret; } @@ -2255,9 +2283,8 @@ static inline bool is_committing_connection(THD *thd) { bool ret; - mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_assert_owner(&thd->LOCK_thd_data); ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false; - mysql_mutex_unlock(&thd->LOCK_thd_data); return ret; } @@ -2270,13 +2297,21 @@ static bool have_client_connections() I_List_iterator<THD> it(threads); while ((tmp=it++)) { + /* Protect thread from concurrent usage */ + mysql_mutex_lock(&tmp->LOCK_thd_data); + /* Protect thread from disconnect or delete */ + mysql_mutex_lock(&tmp->LOCK_thd_kill); DBUG_PRINT("quit",("Informing thread %lld that it's time to die", (longlong) tmp->thread_id)); if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION) { (void)abort_replicated(tmp); + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); return true; } + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); } return false; } @@ -2308,14 +2343,26 @@ static my_bool have_committing_connections() I_List_iterator<THD> it(threads); while ((tmp=it++)) { + /* Protect from concurrent usage */ + mysql_mutex_lock(&tmp->LOCK_thd_data); + /* Protect from disconnect or delete */ + mysql_mutex_lock(&tmp->LOCK_thd_kill); if (!is_client_connection(tmp)) + { + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); continue; + } if (is_committing_connection(tmp)) { mysql_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); return TRUE; } + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); } mysql_mutex_unlock(&LOCK_thread_count); return FALSE; @@ -2358,32 +2405,54 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd) { DBUG_PRINT("quit",("Informing thread %lld that it's time to die", (longlong) tmp->thread_id)); + /* Protect from concurrent usage */ + mysql_mutex_lock(&tmp->LOCK_thd_data); + /* Protect from disconnect or delete */ + mysql_mutex_lock(&tmp->LOCK_thd_kill); /* We skip slave threads & scheduler on this first loop through. */ if (!is_client_connection(tmp)) + { + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); continue; + } if (tmp == except_caller_thd) { DBUG_ASSERT(is_client_connection(tmp)); + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); continue; } if (is_replaying_connection(tmp)) { - tmp->set_killed(KILL_CONNECTION); + tmp->set_killed_no_mutex(KILL_CONNECTION); + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); continue; } - /* replicated transactions must be skipped */ + /* replicated transactions must be skipped and aborted + with wsrep_abort_thd. */ if (abort_replicated(tmp)) + { + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); continue; + } WSREP_DEBUG("closing connection %lld", (longlong) tmp->thread_id); /* - instead of wsrep_close_thread() we do now soft kill by THD::awake - */ - tmp->awake(KILL_CONNECTION); + instead of wsrep_close_thread() we do now soft kill by + THD::awake(). Here also victim needs to be protected from + concurrent usage or disconnect or delete. + */ + tmp->awake_no_mutex(KILL_CONNECTION); + + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); } mysql_mutex_unlock(&LOCK_thread_count); @@ -2398,16 +2467,22 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd) I_List_iterator<THD> it2(threads); while ((tmp=it2++)) { -#ifndef __bsdi__ // Bug in BSDI kernel - if (is_client_connection(tmp) && - !abort_replicated(tmp) && - !is_replaying_connection(tmp) && - tmp != except_caller_thd) + /* Protect from concurrent usage */ + mysql_mutex_lock(&tmp->LOCK_thd_data); + /* Protect from disconnect or delete */ + mysql_mutex_lock(&tmp->LOCK_thd_kill); + if (is_client_connection(tmp)) { - WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id); - close_connection(tmp,0); + if (!abort_replicated(tmp) && + !is_replaying_connection(tmp) && + tmp != except_caller_thd) + { + WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id); + close_connection(tmp,0, true); + } } -#endif + mysql_mutex_unlock(&tmp->LOCK_thd_data); + mysql_mutex_unlock(&tmp->LOCK_thd_kill); } DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); @@ -2594,7 +2669,9 @@ extern "C" void wsrep_thd_set_query_state( void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state) { - if (WSREP(thd)) thd->wsrep_conflict_state= state; + DBUG_ASSERT(thd); + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + thd->wsrep_conflict_state= state; } @@ -2662,12 +2739,14 @@ wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd) void wsrep_thd_LOCK(THD *thd) { mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_kill); } void wsrep_thd_UNLOCK(THD *thd) { mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); } @@ -2747,9 +2826,12 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) { if (signal) { - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->awake(KILL_QUERY); - mysql_mutex_unlock(&thd->LOCK_thd_data); + /* Here we should hold THD::LOCK_thd_data to + protect from concurrent usage and + THD::LOCK_thd_kill from disconnect or delete */ + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + mysql_mutex_assert_owner(&thd->LOCK_thd_kill); + thd->awake_no_mutex(KILL_QUERY); } else { diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 6fd57edc692..662a17d4e4e 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2013 Codership Oy <info@codership.com> +/* Copyright (C) 2013-2021 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -803,10 +803,13 @@ my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync) int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) { - THD *victim_thd = (THD *) victim_thd_ptr; - THD *bf_thd = (THD *) bf_thd_ptr; + THD *victim_thd= (THD *) victim_thd_ptr; + THD *bf_thd= (THD *) bf_thd_ptr; DBUG_ENTER("wsrep_abort_thd"); + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data); + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill); + if ( (WSREP(bf_thd) || ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) && bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && |