summaryrefslogtreecommitdiff
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc134
1 files changed, 108 insertions, 26 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 9f152d2a20c..71376e5a987 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -1,4 +1,4 @@
-/* Copyright 2008-2015 Codership Oy <http://www.codership.com>
+/* Copyright 2008-2021 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -833,13 +833,28 @@ void wsrep_thr_init()
DBUG_VOID_RETURN;
}
+/* This is wrapper for wsrep_break_lock in thr_lock.c */
+static int wsrep_thr_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
+{
+ THD* victim_thd= (THD *) victim_thd_ptr;
+ /* We need to lock THD::LOCK_thd_data to protect victim
+ from concurrent usage and THD::LOCK_thd_kill to protect
+ from disconnect or delete. */
+ mysql_mutex_lock(&victim_thd->LOCK_thd_data);
+ mysql_mutex_lock(&victim_thd->LOCK_thd_kill);
+ int res= wsrep_abort_thd(bf_thd_ptr, victim_thd_ptr, signal);
+ mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
+ mysql_mutex_unlock(&victim_thd->LOCK_thd_kill);
+ return res;
+}
+
void wsrep_init_startup (bool first)
{
if (wsrep_init()) unireg_abort(1);
wsrep_thr_lock_init(
(wsrep_thd_is_brute_force_fun)wsrep_thd_is_BF,
- (wsrep_abort_thd_fun)wsrep_abort_thd,
+ (wsrep_abort_thd_fun)wsrep_thr_abort_thd,
wsrep_debug, wsrep_convert_LOCK_to_trx,
(wsrep_on_fun)wsrep_on);
@@ -1685,6 +1700,11 @@ static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_,
case SQLCOM_DROP_TABLE:
buf_err= wsrep_drop_table_query(thd, &buf, &buf_len);
break;
+ case SQLCOM_KILL:
+ WSREP_DEBUG("KILL as TOI: %s", thd->query());
+ buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
+ &buf, &buf_len);
+ break;
case SQLCOM_CREATE_ROLE:
if (sp_process_definer(thd))
{
@@ -2006,6 +2026,8 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
ticket->wsrep_report(wsrep_debug);
mysql_mutex_lock(&granted_thd->LOCK_thd_data);
+ mysql_mutex_lock(&granted_thd->LOCK_thd_kill);
+
if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
granted_thd->wsrep_exec_mode == REPL_RECV)
{
@@ -2013,6 +2035,7 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
request_thd, granted_thd);
ticket->wsrep_report(true);
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_kill);
ret= true;
}
else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
@@ -2021,6 +2044,7 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
WSREP_DEBUG("BF thread waiting for FLUSH");
ticket->wsrep_report(wsrep_debug);
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_kill);
ret= false;
}
else
@@ -2045,15 +2069,17 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
ticket->wsrep_report(true);
}
- mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ /* This will call wsrep_abort_transaction so we should hold
+ THD::LOCK_thd_data to protect victim from concurrent usage
+ and THD::LOCK_thd_kill to protect from disconnect or delete. */
wsrep_abort_thd((void *) request_thd, (void *) granted_thd, 1);
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_kill);
ret= false;
}
}
else
- {
mysql_mutex_unlock(&request_thd->LOCK_thd_data);
- }
return ret;
}
@@ -2221,6 +2247,8 @@ error:
static bool abort_replicated(THD *thd)
{
bool ret_code= false;
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
+ mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
if (thd->wsrep_query_state== QUERY_COMMITTING)
{
WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id));
@@ -2235,6 +2263,7 @@ static bool abort_replicated(THD *thd)
/**/
static inline bool is_client_connection(THD *thd)
{
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
return (thd->wsrep_client_thread && thd->variables.wsrep_on);
}
@@ -2243,9 +2272,8 @@ static inline bool is_replaying_connection(THD *thd)
{
bool ret;
- mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
return ret;
}
@@ -2255,9 +2283,8 @@ static inline bool is_committing_connection(THD *thd)
{
bool ret;
- mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
return ret;
}
@@ -2270,13 +2297,21 @@ static bool have_client_connections()
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
+ /* Protect thread from concurrent usage */
+ mysql_mutex_lock(&tmp->LOCK_thd_data);
+ /* Protect thread from disconnect or delete */
+ mysql_mutex_lock(&tmp->LOCK_thd_kill);
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
{
(void)abort_replicated(tmp);
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
return true;
}
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
}
return false;
}
@@ -2308,14 +2343,26 @@ static my_bool have_committing_connections()
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
+ /* Protect from concurrent usage */
+ mysql_mutex_lock(&tmp->LOCK_thd_data);
+ /* Protect from disconnect or delete */
+ mysql_mutex_lock(&tmp->LOCK_thd_kill);
if (!is_client_connection(tmp))
+ {
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
continue;
+ }
if (is_committing_connection(tmp))
{
mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
return TRUE;
}
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
}
mysql_mutex_unlock(&LOCK_thread_count);
return FALSE;
@@ -2358,32 +2405,54 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
{
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
+ /* Protect from concurrent usage */
+ mysql_mutex_lock(&tmp->LOCK_thd_data);
+ /* Protect from disconnect or delete */
+ mysql_mutex_lock(&tmp->LOCK_thd_kill);
/* We skip slave threads & scheduler on this first loop through. */
if (!is_client_connection(tmp))
+ {
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
continue;
+ }
if (tmp == except_caller_thd)
{
DBUG_ASSERT(is_client_connection(tmp));
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
continue;
}
if (is_replaying_connection(tmp))
{
- tmp->set_killed(KILL_CONNECTION);
+ tmp->set_killed_no_mutex(KILL_CONNECTION);
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
continue;
}
- /* replicated transactions must be skipped */
+ /* replicated transactions must be skipped and aborted
+ with wsrep_abort_thd. */
if (abort_replicated(tmp))
+ {
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
continue;
+ }
WSREP_DEBUG("closing connection %lld", (longlong) tmp->thread_id);
/*
- instead of wsrep_close_thread() we do now soft kill by THD::awake
- */
- tmp->awake(KILL_CONNECTION);
+ instead of wsrep_close_thread() we do now soft kill by
+ THD::awake(). Here also victim needs to be protected from
+ concurrent usage or disconnect or delete.
+ */
+ tmp->awake_no_mutex(KILL_CONNECTION);
+
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
}
mysql_mutex_unlock(&LOCK_thread_count);
@@ -2398,16 +2467,22 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
I_List_iterator<THD> it2(threads);
while ((tmp=it2++))
{
-#ifndef __bsdi__ // Bug in BSDI kernel
- if (is_client_connection(tmp) &&
- !abort_replicated(tmp) &&
- !is_replaying_connection(tmp) &&
- tmp != except_caller_thd)
+ /* Protect from concurrent usage */
+ mysql_mutex_lock(&tmp->LOCK_thd_data);
+ /* Protect from disconnect or delete */
+ mysql_mutex_lock(&tmp->LOCK_thd_kill);
+ if (is_client_connection(tmp))
{
- WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id);
- close_connection(tmp,0);
+ if (!abort_replicated(tmp) &&
+ !is_replaying_connection(tmp) &&
+ tmp != except_caller_thd)
+ {
+ WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id);
+ close_connection(tmp,0, true);
+ }
}
-#endif
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ mysql_mutex_unlock(&tmp->LOCK_thd_kill);
}
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
@@ -2594,7 +2669,9 @@ extern "C" void wsrep_thd_set_query_state(
void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state)
{
- if (WSREP(thd)) thd->wsrep_conflict_state= state;
+ DBUG_ASSERT(thd);
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
+ thd->wsrep_conflict_state= state;
}
@@ -2662,12 +2739,14 @@ wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
void wsrep_thd_LOCK(THD *thd)
{
mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
}
void wsrep_thd_UNLOCK(THD *thd)
{
mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
}
@@ -2747,9 +2826,12 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
{
if (signal)
{
- mysql_mutex_lock(&thd->LOCK_thd_data);
- thd->awake(KILL_QUERY);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ /* Here we should hold THD::LOCK_thd_data to
+ protect from concurrent usage and
+ THD::LOCK_thd_kill from disconnect or delete */
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
+ mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
+ thd->awake_no_mutex(KILL_QUERY);
}
else
{