diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 1 | ||||
-rw-r--r-- | sql/mysqld.cc | 9 | ||||
-rw-r--r-- | sql/mysqld.h | 2 | ||||
-rw-r--r-- | sql/service_wsrep.cc | 34 | ||||
-rw-r--r-- | sql/slave.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 27 | ||||
-rw-r--r-- | sql/sql_class.h | 13 | ||||
-rw-r--r-- | sql/sql_parse.cc | 42 | ||||
-rw-r--r-- | sql/sql_repl.cc | 4 | ||||
-rw-r--r-- | sql/sql_show.cc | 1 | ||||
-rw-r--r-- | sql/wsrep_client_service.cc | 7 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 109 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 34 |
13 files changed, 189 insertions, 96 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index d49a695f197..afbf2df4adc 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -913,7 +913,6 @@ static my_bool kill_handlerton(THD *thd, plugin_ref plugin, { handlerton *hton= plugin_hton(plugin); - mysql_mutex_assert_owner(&thd->LOCK_thd_data); if (hton->kill_query && thd_get_ha_data(thd, hton)) hton->kill_query(hton, thd, *(enum thd_kill_levels *) level); return FALSE; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 072cd42cc23..3db7ca40704 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -2106,7 +2106,7 @@ static void clean_up_mutexes() ****************************************************************************/ #ifdef EMBEDDED_LIBRARY -void close_connection(THD *thd, uint sql_errno) +void close_connection(THD *thd, uint sql_errno, my_bool locked) { } #else @@ -2515,7 +2515,7 @@ static void network_init(void) For the connection that is doing shutdown, this is called twice */ -void close_connection(THD *thd, uint sql_errno) +void close_connection(THD *thd, uint sql_errno, my_bool locked) { int lvl= (thd->main_security_ctx.user ? 3 : 1); DBUG_ENTER("close_connection"); @@ -2531,7 +2531,10 @@ void close_connection(THD *thd, uint sql_errno) "This connection closed normally without" " authentication")); - thd->disconnect(); + if (locked) + thd->disconnect_mutexed(); + else + thd->disconnect(); MYSQL_CONNECTION_DONE((int) sql_errno, thd->thread_id); diff --git a/sql/mysqld.h b/sql/mysqld.h index 142d2dbafb0..afb5e9084d3 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -75,7 +75,7 @@ enum enum_slave_parallel_mode { /* Function prototypes */ void kill_mysql(THD *thd); -void close_connection(THD *thd, uint sql_errno= 0); +void close_connection(THD *thd, uint sql_errno= 0, my_bool locked=false); void handle_connection_in_main_thread(CONNECT *thd); void create_thread_to_handle_connection(CONNECT *connect); void unlink_thd(THD *thd); diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc index 4016b0902c7..ca816cf1332 100644 --- a/sql/service_wsrep.cc +++ b/sql/service_wsrep.cc @@ -29,12 +29,14 @@ extern "C" my_bool wsrep_on(const THD *thd) extern "C" void wsrep_thd_LOCK(const THD *thd) { + mysql_mutex_lock(&thd->LOCK_thd_kill); mysql_mutex_lock(&thd->LOCK_thd_data); } extern "C" void wsrep_thd_UNLOCK(const THD *thd) { mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); } extern "C" void wsrep_thd_kill_LOCK(const THD *thd) @@ -188,6 +190,9 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd, DBUG_ASSERT(wsrep_thd_is_SR(victim_thd)); if (!victim_thd || !wsrep_on(bf_thd)) return; + mysql_mutex_lock(&victim_thd->LOCK_thd_kill); + mysql_mutex_lock(&victim_thd->LOCK_thd_data); + WSREP_DEBUG("handle rollback, for deadlock: thd %llu trx_id %" PRIu64 " frags %zu conf %s", victim_thd->thread_id, victim_thd->wsrep_trx_id(), @@ -208,6 +213,10 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd, { wsrep_thd_self_abort(victim_thd); } + + mysql_mutex_unlock(&victim_thd->LOCK_thd_kill); + mysql_mutex_unlock(&victim_thd->LOCK_thd_data); + if (bf_thd) { wsrep_store_threadvars(bf_thd); @@ -217,16 +226,8 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd, extern "C" my_bool wsrep_thd_bf_abort(THD *bf_thd, THD *victim_thd, my_bool signal) { - DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort", - { - const char act[]= - "now " - "SIGNAL sync.before_wsrep_thd_abort_reached " - "WAIT_FOR signal.before_wsrep_thd_abort"; - DBUG_ASSERT(!debug_sync_set_action(bf_thd, - STRING_WITH_LEN(act))); - };); - + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill); + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data); my_bool ret= wsrep_bf_abort(bf_thd, victim_thd); /* Send awake signal if victim was BF aborted or does not @@ -235,26 +236,19 @@ extern "C" my_bool wsrep_thd_bf_abort(THD *bf_thd, THD *victim_thd, */ if ((ret || !wsrep_on(victim_thd)) && signal) { - mysql_mutex_assert_not_owner(&victim_thd->LOCK_thd_data); - mysql_mutex_assert_not_owner(&victim_thd->LOCK_thd_kill); - mysql_mutex_lock(&victim_thd->LOCK_thd_data); - if (victim_thd->wsrep_aborter && victim_thd->wsrep_aborter != bf_thd->thread_id) { WSREP_DEBUG("victim is killed already by %llu, skipping awake", victim_thd->wsrep_aborter); - mysql_mutex_unlock(&victim_thd->LOCK_thd_data); return false; } - mysql_mutex_lock(&victim_thd->LOCK_thd_kill); victim_thd->wsrep_aborter= bf_thd->thread_id; victim_thd->awake_no_mutex(KILL_QUERY); - mysql_mutex_unlock(&victim_thd->LOCK_thd_kill); - mysql_mutex_unlock(&victim_thd->LOCK_thd_data); - } else { - WSREP_DEBUG("wsrep_thd_bf_abort skipped awake"); } + else + WSREP_DEBUG("wsrep_thd_bf_abort skipped awake"); + return ret; } diff --git a/sql/slave.cc b/sql/slave.cc index fd66b0e1175..1da030084ef 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1072,8 +1072,8 @@ terminate_slave_thread(THD *thd, int error __attribute__((unused)); DBUG_PRINT("loop", ("killing slave thread")); - mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_kill); + mysql_mutex_lock(&thd->LOCK_thd_data); #ifndef DONT_USE_THR_ALARM /* Error codes from pthread_kill are: diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 687e027338f..62cfd22ec64 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -826,6 +826,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) mysql_mutex_init(key_LOCK_wakeup_ready, &LOCK_wakeup_ready, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_thd_kill, &LOCK_thd_kill, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wakeup_ready, &COND_wakeup_ready, 0); + mysql_mutex_record_order(&LOCK_thd_kill, &LOCK_thd_data); /* Variables with default values */ proc_info="login"; @@ -1998,13 +1999,13 @@ void THD::abort_current_cond_wait(bool force) the Vio might be disassociated concurrently. */ -void THD::disconnect() +void THD::disconnect_mutexed() { Vio *vio= NULL; - set_killed(KILL_CONNECTION); - - mysql_mutex_lock(&LOCK_thd_data); + mysql_mutex_assert_owner(&LOCK_thd_data); + mysql_mutex_assert_owner(&LOCK_thd_kill); + set_killed_no_mutex(KILL_CONNECTION); #ifdef SIGNAL_WITH_VIO_CLOSE /* @@ -2020,8 +2021,6 @@ void THD::disconnect() if (net.vio != vio) vio_close(net.vio); net.thd= 0; // Don't collect statistics - - mysql_mutex_unlock(&LOCK_thd_data); } @@ -2048,6 +2047,9 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, if (needs_thr_lock_abort) { + /* Protect thread from concurrent disconnect and delete */ + mysql_mutex_lock(&in_use->LOCK_thd_kill); + /* Protect thread from concurrent usage */ mysql_mutex_lock(&in_use->LOCK_thd_data); /* If not already dying */ if (in_use->killed != KILL_CONNECTION_HARD) @@ -2064,11 +2066,20 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, thread can see those instances (e.g. see partitioning code). */ if (!thd_table->needs_reopen()) - { signalled|= mysql_lock_abort_for_thread(this, thd_table); - } } +#ifdef WITH_WSREP + if (WSREP(this) && wsrep_thd_is_BF(this, false)) + { + WSREP_DEBUG("notify_shared_lock: BF thread %llu query %s" + " victim %llu query %s", + this->real_id, wsrep_thd_query(this), + in_use->real_id, wsrep_thd_query(in_use)); + wsrep_abort_thd(this, in_use, false); + } +#endif /* WITH_WSREP */ } + mysql_mutex_unlock(&in_use->LOCK_thd_kill); mysql_mutex_unlock(&in_use->LOCK_thd_data); } DBUG_RETURN(signalled); diff --git a/sql/sql_class.h b/sql/sql_class.h index 81452cda035..46735ce73c0 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3470,8 +3470,8 @@ public: void awake_no_mutex(killed_state state_to_set); void awake(killed_state state_to_set) { - mysql_mutex_lock(&LOCK_thd_data); mysql_mutex_lock(&LOCK_thd_kill); + mysql_mutex_lock(&LOCK_thd_data); awake_no_mutex(state_to_set); mysql_mutex_unlock(&LOCK_thd_kill); mysql_mutex_unlock(&LOCK_thd_data); @@ -3479,8 +3479,15 @@ public: void abort_current_cond_wait(bool force); /** Disconnect the associated communication endpoint. */ - void disconnect(); - + inline void disconnect() + { + mysql_mutex_lock(&LOCK_thd_kill); + mysql_mutex_lock(&LOCK_thd_data); + disconnect_mutexed(); + mysql_mutex_unlock(&LOCK_thd_kill); + mysql_mutex_unlock(&LOCK_thd_data); + } + void disconnect_mutexed(); /* Allows this thread to serve as a target for others to schedule Async diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index af30aa6fde9..0a0aef66507 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -9216,7 +9216,8 @@ void add_join_natural(TABLE_LIST *a, TABLE_LIST *b, List<String> *using_fields, @param query_id If true, search by query_id instead of thread_id @return NULL - not found - pointer - thread found, and its LOCK_thd_kill is locked. + pointer - thread found, and its LOCK_thd_data and + LOCK_thd_kill is locked. */ struct find_thread_callback_arg @@ -9234,6 +9235,7 @@ static my_bool find_thread_callback(THD *thd, find_thread_callback_arg *arg) if (arg->id == (arg->query_id ? thd->query_id : (longlong) thd->thread_id)) { mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete + mysql_mutex_lock(&thd->LOCK_thd_data); // Lock from concurrent usage arg->thd= thd; return 1; } @@ -9248,7 +9250,6 @@ THD *find_thread_by_id(longlong id, bool query_id) return arg.thd; } - mysql_mutex_lock(&thd->LOCK_thd_data); /** kill one thread. @@ -9292,7 +9293,6 @@ kill_one_thread(THD *thd, longlong id, killed_state kill_signal, killed_type typ faster and do a harder kill than KILL_SYSTEM_THREAD; */ - mysql_mutex_lock(&tmp->LOCK_thd_data); // for various wsrep* checks below #ifdef WITH_WSREP if (((thd->security_ctx->master_access & PRIV_KILL_OTHER_USER_PROCESS) || thd->security_ctx->user_matches(tmp->security_ctx)) && @@ -9324,8 +9324,8 @@ kill_one_thread(THD *thd, longlong id, killed_state kill_signal, killed_type typ else error= (type == KILL_TYPE_QUERY ? ER_KILL_QUERY_DENIED_ERROR : ER_KILL_DENIED_ERROR); - mysql_mutex_unlock(&tmp->LOCK_thd_data); } + mysql_mutex_unlock(&tmp->LOCK_thd_data); mysql_mutex_unlock(&tmp->LOCK_thd_kill); DBUG_PRINT("exit", ("%d", error)); DBUG_RETURN(error); @@ -9438,6 +9438,18 @@ static void sql_kill(THD *thd, longlong id, killed_state state, killed_type type) { uint error; +#ifdef WITH_WSREP + if (WSREP(thd)) + { + WSREP_DEBUG("sql_kill called"); + if (thd->wsrep_applier) + { + WSREP_DEBUG("KILL in applying, bailing out here"); + return; + } + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) + } +#endif /* WITH_WSREP */ if (likely(!(error= kill_one_thread(thd, id, state, type)))) { if (!thd->killed) @@ -9447,6 +9459,11 @@ void sql_kill(THD *thd, longlong id, killed_state state, killed_type type) } else my_error(error, MYF(0), id); +#ifdef WITH_WSREP + return; + wsrep_error_label: + my_error(ER_CANNOT_USER, MYF(0), wsrep_thd_query(thd)); +#endif /* WITH_WSREP */ } @@ -9455,6 +9472,18 @@ sql_kill_user(THD *thd, LEX_USER *user, killed_state state) { uint error; ha_rows rows; +#ifdef WITH_WSREP + if (WSREP(thd)) + { + WSREP_DEBUG("sql_kill_user called"); + if (thd->wsrep_applier) + { + WSREP_DEBUG("KILL in applying, bailing out here"); + return; + } + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) + } +#endif /* WITH_WSREP */ if (likely(!(error= kill_threads_for_user(thd, user, state, &rows)))) my_ok(thd, rows); else @@ -9465,6 +9494,11 @@ sql_kill_user(THD *thd, LEX_USER *user, killed_state state) */ my_error(error, MYF(0), user->host.str, user->user.str); } +#ifdef WITH_WSREP + return; + wsrep_error_label: + my_error(ER_CANNOT_USER, MYF(0), user->user.str); +#endif /* WITH_WSREP */ } diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 09ad0e54627..b65efc1dc67 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1,5 +1,5 @@ /* Copyright (c) 2000, 2018, Oracle and/or its affiliates. - Copyright (c) 2008, 2020, MariaDB Corporation + Copyright (c) 2008, 2021, MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -3498,8 +3498,8 @@ static my_bool kill_callback(THD *thd, kill_callback_arg *arg) thd->variables.server_id == arg->slave_server_id) { arg->thd= thd; - mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete + mysql_mutex_lock(&thd->LOCK_thd_data); return 1; } return 0; diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 6ff9ef8a810..1d4a73c4db8 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -3102,6 +3102,7 @@ int fill_show_explain(THD *thd, TABLE_LIST *table, COND *cond) if ((tmp= find_thread_by_id(thread_id))) { Security_context *tmp_sctx= tmp->security_ctx; + mysql_mutex_unlock(&tmp->LOCK_thd_data); /* If calling_user==NULL, calling thread has SUPER or PROCESS privilege, and so can do SHOW EXPLAIN on any user. diff --git a/sql/wsrep_client_service.cc b/sql/wsrep_client_service.cc index c2f49dbdd6b..6ab2834a219 100644 --- a/sql/wsrep_client_service.cc +++ b/sql/wsrep_client_service.cc @@ -68,20 +68,13 @@ bool Wsrep_client_service::interrupted( wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED) const { DBUG_ASSERT(m_thd == current_thd); - /* Underlying mutex in lock object points to LOCK_thd_data, which - protects m_thd->wsrep_trx(), LOCK_thd_kill protects m_thd->killed. - Locking order is: - 1) LOCK_thd_data - 2) LOCK_thd_kill */ mysql_mutex_assert_owner(static_cast<mysql_mutex_t*>(lock.mutex()->native())); - mysql_mutex_lock(&m_thd->LOCK_thd_kill); bool ret= (m_thd->killed != NOT_KILLED); if (ret) { WSREP_DEBUG("wsrep state is interrupted, THD::killed %d trx state %d", m_thd->killed, m_thd->wsrep_trx().state()); } - mysql_mutex_unlock(&m_thd->LOCK_thd_kill); return ret; } diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 7661dab2b8a..6a36826bca9 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2547,7 +2547,9 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, request_thd, granted_thd); ticket->wsrep_report(wsrep_debug); + mysql_mutex_lock(&granted_thd->LOCK_thd_kill); mysql_mutex_lock(&granted_thd->LOCK_thd_data); + if (wsrep_thd_is_toi(granted_thd) || wsrep_thd_is_applying(granted_thd)) { @@ -2556,13 +2558,15 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, WSREP_DEBUG("BF thread waiting for SR in aborting state"); ticket->wsrep_report(wsrep_debug); mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); } else if (wsrep_thd_is_SR(granted_thd) && !wsrep_thd_is_SR(request_thd)) { - WSREP_MDL_LOG(INFO, "MDL conflict, DDL vs SR", + WSREP_MDL_LOG(INFO, "MDL conflict, DDL vs SR", schema, schema_len, request_thd, granted_thd); - mysql_mutex_unlock(&granted_thd->LOCK_thd_data); wsrep_abort_thd(request_thd, granted_thd, 1); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); } else { @@ -2570,6 +2574,7 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, request_thd, granted_thd); ticket->wsrep_report(true); mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); unireg_abort(1); } } @@ -2579,14 +2584,16 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, WSREP_DEBUG("BF thread waiting for FLUSH"); ticket->wsrep_report(wsrep_debug); mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); } else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE) { WSREP_DEBUG("DROP caused BF abort, conf %s", wsrep_thd_transaction_state_str(granted_thd)); ticket->wsrep_report(wsrep_debug); - mysql_mutex_unlock(&granted_thd->LOCK_thd_data); wsrep_abort_thd(request_thd, granted_thd, 1); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); } else { @@ -2595,8 +2602,9 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, ticket->wsrep_report(wsrep_debug); if (granted_thd->wsrep_trx().active()) { - mysql_mutex_unlock(&granted_thd->LOCK_thd_data); wsrep_abort_thd(request_thd, granted_thd, 1); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); } else { @@ -2604,10 +2612,11 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, Granted_thd is likely executing with wsrep_on=0. If the requesting thd is BF, BF abort and wait. */ - mysql_mutex_unlock(&granted_thd->LOCK_thd_data); - if (wsrep_thd_is_BF(request_thd, FALSE)) + if (wsrep_thd_is_BF(request_thd, false)) { ha_abort_transaction(request_thd, granted_thd, TRUE); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + mysql_mutex_unlock(&granted_thd->LOCK_thd_kill); } else { @@ -2620,15 +2629,15 @@ void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, } } else - { mysql_mutex_unlock(&request_thd->LOCK_thd_data); - } } /**/ static bool abort_replicated(THD *thd) { bool ret_code= false; + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + mysql_mutex_assert_owner(&thd->LOCK_thd_kill); if (thd->wsrep_trx().state() == wsrep::transaction::s_committing) { WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id)); @@ -2640,29 +2649,35 @@ static bool abort_replicated(THD *thd) } /**/ -static inline bool is_client_connection(THD *thd) +static inline my_bool is_client_connection(THD *thd, my_bool *sync) { - return (thd->wsrep_client_thread && thd->variables.wsrep_on); + if (sync) + mysql_mutex_lock(&thd->LOCK_thd_data); + my_bool ret= (thd->wsrep_client_thread && thd->variables.wsrep_on); + if (sync) + mysql_mutex_unlock(&thd->LOCK_thd_data); + + return ret; } -static inline bool is_replaying_connection(THD *thd) +static inline my_bool is_replaying_connection(THD *thd, my_bool *sync) { - bool ret; - - mysql_mutex_lock(&thd->LOCK_thd_data); - ret= (thd->wsrep_trx().state() == wsrep::transaction::s_replaying) ? true : false; - mysql_mutex_unlock(&thd->LOCK_thd_data); + if (sync) + mysql_mutex_lock(&thd->LOCK_thd_data); + my_bool ret= ((thd->wsrep_trx().state() == wsrep::transaction::s_replaying) ? true : false); + if (sync) + mysql_mutex_unlock(&thd->LOCK_thd_data); return ret; } -static inline bool is_committing_connection(THD *thd) +static inline my_bool is_committing_connection(THD *thd, my_bool *sync) { - bool ret; - - mysql_mutex_lock(&thd->LOCK_thd_data); - ret= (thd->wsrep_trx().state() == wsrep::transaction::s_committing) ? true : false; - mysql_mutex_unlock(&thd->LOCK_thd_data); + if (sync) + mysql_mutex_lock(&thd->LOCK_thd_data); + my_bool ret= ((thd->wsrep_trx().state() == wsrep::transaction::s_committing) ? true : false); + if (sync) + mysql_mutex_lock(&thd->LOCK_thd_data); return ret; } @@ -2671,12 +2686,17 @@ static my_bool have_client_connections(THD *thd, void*) { DBUG_PRINT("quit",("Informing thread %lld that it's time to die", (longlong) thd->thread_id)); - if (is_client_connection(thd) && thd->killed == KILL_CONNECTION) + my_bool ret=false; + mysql_mutex_lock(&thd->LOCK_thd_kill); + mysql_mutex_lock(&thd->LOCK_thd_data); + if (is_client_connection(thd, NULL) && thd->killed == KILL_CONNECTION) { (void)abort_replicated(thd); - return 1; + ret= true; } - return 0; + mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); + return ret; } static void wsrep_close_thread(THD *thd) @@ -2688,59 +2708,72 @@ static void wsrep_close_thread(THD *thd) mysql_mutex_unlock(&thd->LOCK_thd_kill); } -static my_bool have_committing_connections(THD *thd, void *) +static my_bool have_committing_connections(THD *thd, my_bool *sync) { - return is_client_connection(thd) && is_committing_connection(thd) ? 1 : 0; + my_bool *need_sync= sync ? NULL : (my_bool *)thd; + if (sync) + mysql_mutex_lock(&thd->LOCK_thd_data); + my_bool ret= (is_client_connection(thd, need_sync) && is_committing_connection(thd, need_sync) ? 1 : 0); + if (sync) + mysql_mutex_unlock(&thd->LOCK_thd_data); + return ret; } int wsrep_wait_committing_connections_close(int wait_time) { int sleep_time= 100; + my_bool sync=true; WSREP_DEBUG("wait for committing transaction to close: %d sleep: %d", wait_time, sleep_time); - while (server_threads.iterate(have_committing_connections) && wait_time > 0) + while (server_threads.iterate(have_committing_connections, &sync) && wait_time > 0) { WSREP_DEBUG("wait for committing transaction to close: %d", wait_time); my_sleep(sleep_time); wait_time -= sleep_time; } - return server_threads.iterate(have_committing_connections); + return server_threads.iterate(have_committing_connections, &sync); } static my_bool kill_all_threads(THD *thd, THD *caller_thd) { DBUG_PRINT("quit", ("Informing thread %lld that it's time to die", (longlong) thd->thread_id)); + mysql_mutex_lock(&thd->LOCK_thd_kill); + mysql_mutex_lock(&thd->LOCK_thd_data); /* We skip slave threads & scheduler on this first loop through. */ - if (is_client_connection(thd) && thd != caller_thd) + if (is_client_connection(thd, NULL) && thd != caller_thd) { - if (is_replaying_connection(thd)) - thd->set_killed(KILL_CONNECTION); + if (is_replaying_connection(thd,NULL)) + thd->set_killed_no_mutex(KILL_CONNECTION); else if (!abort_replicated(thd)) { /* replicated transactions must be skipped */ WSREP_DEBUG("closing connection %lld", (longlong) thd->thread_id); /* instead of wsrep_close_thread() we do now soft kill by THD::awake */ - thd->awake(KILL_CONNECTION); + thd->awake_no_mutex(KILL_CONNECTION); } } + mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); return 0; } static my_bool kill_remaining_threads(THD *thd, THD *caller_thd) { -#ifndef __bsdi__ // Bug in BSDI kernel - if (is_client_connection(thd) && + mysql_mutex_lock(&thd->LOCK_thd_kill); + mysql_mutex_lock(&thd->LOCK_thd_data); + if (is_client_connection(thd,NULL) && !abort_replicated(thd) && - !is_replaying_connection(thd) && + !is_replaying_connection(thd,NULL) && thd_is_connection_alive(thd) && thd != caller_thd) { WSREP_INFO("killing local connection: %lld", (longlong) thd->thread_id); - close_connection(thd); + close_connection(thd, true); } -#endif + mysql_mutex_unlock(&thd->LOCK_thd_data); + mysql_mutex_unlock(&thd->LOCK_thd_kill); return 0; } diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index dc6bb21dbd9..e60a2a693d1 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2013 Codership Oy <info@codership.com> +/* Copyright (C) 2013-2021 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -64,7 +64,7 @@ static void wsrep_replication_process(THD *thd, delete thd->wsrep_rgi->rli->mi; delete thd->wsrep_rgi->rli; - + thd->wsrep_rgi->cleanup_after_session(); delete thd->wsrep_rgi; thd->wsrep_rgi= NULL; @@ -314,8 +314,8 @@ int wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal) THD *victim_thd= (THD *) victim_thd_ptr; THD *bf_thd= (THD *) bf_thd_ptr; - mysql_mutex_lock(&victim_thd->LOCK_thd_data); - + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data); + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill); /* Note that when you use RSU node is desynced from cluster, thus WSREP(thd) might not be true. */ @@ -327,16 +327,13 @@ int wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal) { WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); - mysql_mutex_unlock(&victim_thd->LOCK_thd_data); ha_abort_transaction(bf_thd, victim_thd, signal); - mysql_mutex_lock(&victim_thd->LOCK_thd_data); } else { WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd); } - mysql_mutex_unlock(&victim_thd->LOCK_thd_data); DBUG_RETURN(1); } @@ -345,6 +342,9 @@ bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd) WSREP_LOG_THD(bf_thd, "BF aborter before"); WSREP_LOG_THD(victim_thd, "victim before"); + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_kill); + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data); + DBUG_EXECUTE_IF("sync.wsrep_bf_abort", { const char act[]= @@ -358,7 +358,7 @@ bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd) if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active()) { WSREP_DEBUG("wsrep_bf_abort, BF abort for non active transaction"); - switch (victim_thd->wsrep_trx().state()) + switch (victim_thd->wsrep_trx().state()) { case wsrep::transaction::s_aborting: /* fall through */ case wsrep::transaction::s_aborted: @@ -367,7 +367,13 @@ bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd) default: break; } + + /* Test: galera_create_table_as_select. Here we enter wsrep-lib + were LOCK_thd_data will be acquired, thus we need to release it. + */ + mysql_mutex_unlock(&victim_thd->LOCK_thd_data); wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id()); + mysql_mutex_lock(&victim_thd->LOCK_thd_data); } bool ret; @@ -375,12 +381,24 @@ bool wsrep_bf_abort(THD* bf_thd, THD* victim_thd) if (wsrep_thd_is_toi(bf_thd)) { + /* Here we enter wsrep-lib were LOCK_thd_data will be acquired, + thus we need to release it. */ + mysql_mutex_unlock(&victim_thd->LOCK_thd_data); ret= victim_thd->wsrep_cs().total_order_bf_abort(bf_seqno); + mysql_mutex_lock(&victim_thd->LOCK_thd_data); } else { + /* Test: mysql-wsrep-features#165. Here we enter wsrep-lib + were LOCK_thd_data will be acquired and later LOCK_thd_kill + thus we need to release them. */ + mysql_mutex_unlock(&victim_thd->LOCK_thd_data); + mysql_mutex_unlock(&victim_thd->LOCK_thd_kill); ret= victim_thd->wsrep_cs().bf_abort(bf_seqno); + mysql_mutex_lock(&victim_thd->LOCK_thd_kill); + mysql_mutex_lock(&victim_thd->LOCK_thd_data); } + if (ret) { wsrep_bf_aborts_counter++; |