summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/sql_class.h2
-rw-r--r--sql/sql_parse.cc4
-rw-r--r--sql/sql_plugin_services.ic4
-rw-r--r--sql/sql_show.h2
-rw-r--r--sql/wsrep_dummy.cc8
-rw-r--r--sql/wsrep_hton.cc8
-rw-r--r--sql/wsrep_mysqld.cc94
-rw-r--r--sql/wsrep_mysqld.h22
-rw-r--r--sql/wsrep_thd.cc376
-rw-r--r--sql/wsrep_thd.h1
-rw-r--r--sql/wsrep_var.cc1
11 files changed, 492 insertions, 30 deletions
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 5ced820a34d..2fe715d0c4a 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4712,6 +4712,8 @@ public:
#ifdef WITH_WSREP
const bool wsrep_applier; /* dedicated slave applier thread */
+ bool wsrep_killer; /* dedicated background
+ kill thread */
bool wsrep_applier_closing; /* applier marked to close */
bool wsrep_client_thread; /* to identify client threads*/
bool wsrep_PA_safe;
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 6802816caaf..4d05201d9f0 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -8920,7 +8920,7 @@ void add_join_natural(TABLE_LIST *a, TABLE_LIST *b, List<String> *using_fields,
pointer - thread found, and its LOCK_thd_kill is locked.
*/
-THD *find_thread_by_id(longlong id, bool query_id)
+THD *find_thread_by_id(longlong id, bool query_id, bool lock_thd_data)
{
THD *tmp;
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
@@ -8931,6 +8931,8 @@ THD *find_thread_by_id(longlong id, bool query_id)
continue;
if (id == (query_id ? tmp->query_id : (longlong) tmp->thread_id))
{
+ // Lock from concurrent data change if requested
+ if (lock_thd_data) mysql_mutex_lock(&tmp->LOCK_thd_data);
mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
break;
}
diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic
index 7c394b57c3e..64e0d2cb9f3 100644
--- a/sql/sql_plugin_services.ic
+++ b/sql/sql_plugin_services.ic
@@ -163,6 +163,7 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_run_wsrep_commit,
wsrep_thd_LOCK,
wsrep_thd_UNLOCK,
+ wsrep_LOCK,
wsrep_thd_awake,
wsrep_thd_conflict_state,
wsrep_thd_conflict_state_str,
@@ -186,7 +187,8 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_trx_order_before,
wsrep_unlock_rollback,
wsrep_set_data_home_dir,
- wsrep_thd_is_applier
+ wsrep_thd_is_applier,
+ wsrep_enqueue_background_kill
};
static struct thd_specifics_service_st thd_specifics_handler=
diff --git a/sql/sql_show.h b/sql/sql_show.h
index 39cbc35230a..804f44a7ffe 100644
--- a/sql/sql_show.h
+++ b/sql/sql_show.h
@@ -143,7 +143,7 @@ const char* get_one_variable(THD *thd, const SHOW_VAR *variable,
/* These functions were under INNODB_COMPATIBILITY_HOOKS */
int get_quote_char_for_identifier(THD *thd, const char *name, size_t length);
-THD *find_thread_by_id(longlong id, bool query_id= false);
+THD *find_thread_by_id(longlong id, bool query_id= false, bool lock_thd_data= false);
class select_result_explain_buffer;
/*
diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc
index 1af74035355..feb86987b3d 100644
--- a/sql/wsrep_dummy.cc
+++ b/sql/wsrep_dummy.cc
@@ -16,6 +16,7 @@
#include "mariadb.h"
#include <sql_class.h>
#include <mysql/service_wsrep.h>
+#include "wsrep_mysqld.h"
my_bool wsrep_thd_is_BF(THD *, my_bool)
{ return 0; }
@@ -163,3 +164,10 @@ void wsrep_log(void (*)(const char *, ...), const char *, ...)
my_bool wsrep_thd_is_applier(MYSQL_THD thd)
{ return false; }
+
+bool wsrep_enqueue_background_kill(wsrep_kill_t item)
+{ return false;}
+
+void wsrep_LOCK(THD *)
+{ }
+
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc
index 05be257cbcb..1ad704aec1a 100644
--- a/sql/wsrep_hton.cc
+++ b/sql/wsrep_hton.cc
@@ -142,10 +142,10 @@ void wsrep_post_commit(THD* thd, bool all)
=> cleanup
*/
if (thd->wsrep_conflict_state != MUST_REPLAY)
- {
- WSREP_DEBUG("cleanup transaction for LOCAL_STATE: %s",
- WSREP_QUERY(thd));
- }
+ {
+ WSREP_DEBUG("cleanup transaction for LOCAL_STATE: %s",
+ wsrep_thd_query(thd));
+ }
/*
Run post-rollback hook to clean up in the case if
some keys were populated for the transaction in provider
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index cfba0ace2cb..df2cbc3f4d3 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -37,6 +37,8 @@
#include <cstdio>
#include <cstdlib>
#include "log_event.h"
+#include <list>
+#include <algorithm>
wsrep_t *wsrep = NULL;
/*
@@ -131,6 +133,8 @@ mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
+mysql_mutex_t LOCK_wsrep_kill;
+mysql_cond_t COND_wsrep_kill;
int wsrep_replaying= 0;
ulong wsrep_running_threads = 0; // # of currently running wsrep
@@ -138,6 +142,7 @@ ulong wsrep_running_threads = 0; // # of currently running wsrep
ulong wsrep_running_applier_threads = 0; // # of running applier threads
ulong wsrep_running_rollbacker_threads = 0; // # of running
// # rollbacker threads
+ulong wsrep_running_killer_threads = 0;
ulong my_bind_addr;
#ifdef HAVE_PSI_INTERFACE
@@ -145,11 +150,13 @@ PSI_mutex_key key_LOCK_wsrep_rollback,
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
- key_LOCK_wsrep_config_state;
+ key_LOCK_wsrep_config_state,
+ key_LOCK_wsrep_kill;
PSI_cond_key key_COND_wsrep_rollback,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
- key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
+ key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
+ key_COND_wsrep_kill;
PSI_file_key key_file_wsrep_gra_log;
@@ -164,7 +171,8 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
- { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
+ { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_kill, "LOCK_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_cond_info wsrep_conds[]=
@@ -174,7 +182,8 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
- { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
+ { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_kill, "COND_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_file_info wsrep_files[]=
@@ -183,14 +192,15 @@ static PSI_file_info wsrep_files[]=
};
PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
- key_wsrep_rollbacker, key_wsrep_applier;
+ key_wsrep_rollbacker, key_wsrep_applier, key_wsrep_killer;
static PSI_thread_info wsrep_threads[]=
{
{&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
- {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}
+ {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL},
+ {&key_wsrep_killer, "wsrep_killer_thread", PSI_FLAG_GLOBAL}
};
#endif /* HAVE_PSI_INTERFACE */
@@ -237,6 +247,7 @@ wsp::Config_state *wsrep_config_state;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
+std::list< wsrep_kill_t > wsrep_kill_list;
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
@@ -829,6 +840,8 @@ void wsrep_thr_init()
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_kill, &LOCK_wsrep_kill, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_kill, &COND_wsrep_kill, NULL);
DBUG_VOID_RETURN;
}
@@ -865,6 +878,7 @@ void wsrep_init_startup (bool first)
if (!wsrep_start_replication()) unireg_abort(1);
wsrep_create_rollbacker();
+ wsrep_create_killer();
wsrep_create_appliers(1);
if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
@@ -906,6 +920,8 @@ void wsrep_thr_deinit()
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_desync);
mysql_mutex_destroy(&LOCK_wsrep_config_state);
+ mysql_mutex_destroy(&LOCK_wsrep_kill);
+ mysql_cond_destroy(&COND_wsrep_kill);
delete wsrep_config_state;
wsrep_config_state= 0; // Safety
}
@@ -1657,7 +1673,7 @@ static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_,
if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
{
- WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
+ WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd));
return 1;
}
@@ -2147,9 +2163,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
case WSREP_ROLLBACKER_THREAD:
wsrep_running_rollbacker_threads++;
break;
+ case WSREP_KILLER_THREAD:
+ wsrep_running_killer_threads++;
+ thd->wsrep_killer= true;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
mysql_cond_broadcast(&COND_thread_count);
@@ -2172,9 +2192,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
wsrep_running_rollbacker_threads--;
break;
+ case WSREP_KILLER_THREAD:
+ DBUG_ASSERT(wsrep_running_killer_threads > 0);
+ wsrep_running_killer_threads--;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
my_free(args);
@@ -2416,7 +2440,11 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
}
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
- WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for rollbacker threads to close: %lu", wsrep_running_rollbacker_threads);
+ WSREP_DEBUG("Waiting for applier threads to close: %lu", wsrep_running_applier_threads);
+ WSREP_DEBUG("Waiting for killer threads to close: %lu", wsrep_running_killer_threads);
+ WSREP_DEBUG("Waiting for wsrep threads to close: %lu", wsrep_running_threads);
while (wait_to_end && have_client_connections())
{
@@ -2450,7 +2478,7 @@ void wsrep_close_threads(THD *thd)
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
- if (tmp->wsrep_applier && tmp != thd)
+ if ((tmp->wsrep_applier || tmp->wsrep_killer) && tmp != thd)
{
WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
wsrep_close_thread (tmp);
@@ -2464,7 +2492,7 @@ void wsrep_wait_appliers_close(THD *thd)
{
/* Wait for wsrep appliers to gracefully exit */
mysql_mutex_lock(&LOCK_thread_count);
- while (wsrep_running_threads > 1)
+ while (wsrep_running_threads > 2)
// 1 is for rollbacker thread which needs to be killed explicitly.
// This gotta be fixed in a more elegant manner if we gonna have arbitrary
// number of non-applier wsrep threads.
@@ -2738,9 +2766,12 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
{
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
+ mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
+
if (signal)
{
- thd->awake(KILL_QUERY);
+ thd->awake_no_mutex(KILL_QUERY);
}
else
{
@@ -2748,6 +2779,9 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
}
+
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
}
@@ -3001,3 +3035,37 @@ bool wsrep_node_is_synced()
{
return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
}
+
+bool wsrep_enqueue_background_kill(wsrep_kill_t item)
+{
+ std::list< wsrep_kill_t >::iterator it;
+ bool inserted= false;
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ for (it = wsrep_kill_list.begin(); it != wsrep_kill_list.end(); it++)
+ {
+ if ((*it).victim_thd_id == item.victim_thd_id)
+ break;
+ }
+
+ if(it != wsrep_kill_list.end())
+ {
+ WSREP_DEBUG("Thread: %lu already on kill list", item.victim_thd_id);
+ }
+ else
+ {
+ wsrep_kill_list.push_back(item);
+ mysql_cond_signal(&COND_wsrep_kill);
+ inserted= true;
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ return inserted;
+}
+
+void wsrep_LOCK(THD* thd)
+{
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
+}
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 55ea032e835..6353e936f01 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -18,9 +18,20 @@
#ifndef WSREP_MYSQLD_H
#define WSREP_MYSQLD_H
+#include <my_config.h>
+#include <stdint.h>
#include <mysql/plugin.h>
#include <mysql/service_wsrep.h>
+typedef struct wsrep_kill {
+ unsigned long victim_thd_id;
+ unsigned long bf_thd_id;
+ uint64_t victim_trx_id;
+ uint64_t bf_trx_id;
+ bool signal;
+ bool wait_lock;
+} wsrep_kill_t;
+
#ifdef WITH_WSREP
typedef struct st_mysql_show_var SHOW_VAR;
@@ -92,6 +103,7 @@ extern my_bool wsrep_slave_UK_checks;
extern ulong wsrep_running_threads;
extern ulong wsrep_running_applier_threads;
extern ulong wsrep_running_rollbacker_threads;
+extern ulong wsrep_running_killer_threads;
extern bool wsrep_new_cluster;
extern bool wsrep_gtid_mode;
extern uint32 wsrep_gtid_domain_id;
@@ -223,8 +235,6 @@ void wsrep_log(void (*fun)(const char *, ...), const char *format, ...);
#define WSREP_PROVIDER_EXISTS \
(wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN))
-#define WSREP_QUERY(thd) (thd->query())
-
extern my_bool wsrep_ready_get();
extern void wsrep_ready_wait();
@@ -254,6 +264,8 @@ extern mysql_cond_t COND_wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_desync;
extern mysql_mutex_t LOCK_wsrep_config_state;
+extern mysql_mutex_t LOCK_wsrep_kill;
+extern mysql_cond_t COND_wsrep_kill;
extern wsrep_aborting_thd_t wsrep_aborting_thd;
extern my_bool wsrep_emulate_bin_log;
extern int wsrep_to_isolation;
@@ -278,6 +290,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying;
extern PSI_cond_key key_COND_wsrep_replaying;
extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
extern PSI_mutex_key key_LOCK_wsrep_desync;
+extern PSI_mutex_key key_LOCK_wsrep_kill;
+extern PSI_cond_key key_COND_wsrep_kill;
extern PSI_file_key key_file_wsrep_gra_log;
@@ -285,6 +299,7 @@ extern PSI_thread_key key_wsrep_sst_joiner;
extern PSI_thread_key key_wsrep_sst_donor;
extern PSI_thread_key key_wsrep_rollbacker;
extern PSI_thread_key key_wsrep_applier;
+extern PSI_thread_key key_wsrep_killer;
#endif /* HAVE_PSI_INTERFACE */
@@ -311,7 +326,8 @@ void thd_binlog_trx_reset(THD * thd);
enum wsrep_thread_type {
WSREP_APPLIER_THREAD=1,
- WSREP_ROLLBACKER_THREAD=2
+ WSREP_ROLLBACKER_THREAD=2,
+ WSREP_KILLER_THREAD=3
};
typedef void (*wsrep_thd_processor_fun)(THD *);
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index 9515fd550f2..77bd32a97d2 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -22,11 +22,16 @@
//#include "global_threads.h" // LOCK_thread_count, etc.
#include "sql_base.h" // close_thread_tables()
#include "mysqld.h" // start_wsrep_THD();
+#include "sql_show.h" // find_thread_by_id
#include "slave.h" // opt_log_slave_updates
#include "rpl_filter.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
+#include "debug_sync.h"
+#include <list>
+
+extern std::list< wsrep_kill_t > wsrep_kill_list;
#if (__LP64__)
static volatile int64 wsrep_bf_aborts_counter(0);
@@ -280,7 +285,7 @@ void wsrep_replay_sp_transaction(THD* thd)
WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
rcode,
(thd->db.str ? thd->db.str : "(null)"),
- WSREP_QUERY(thd));
+ wsrep_thd_query(thd));
/* we're now in inconsistent state, must abort */
mysql_mutex_unlock(&thd->LOCK_thd_data);
unireg_abort(1);
@@ -528,12 +533,27 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
mysql_mutex_lock(&LOCK_thread_count);
ulong old_wsrep_running_threads= wsrep_running_threads;
+ PSI_thread_key key;
- DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD ||
- args->thread_type == WSREP_ROLLBACKER_THREAD);
+#ifdef HAVE_PSI_INTERFACE
+ switch(args->thread_type)
+ {
+ case WSREP_APPLIER_THREAD:
+ key = key_wsrep_applier;
+ break;
+ case WSREP_ROLLBACKER_THREAD:
+ key = key_wsrep_rollbacker;
+ break;
+ case WSREP_KILLER_THREAD:
+ key = key_wsrep_killer;
+ break;
+ default:
+ WSREP_ERROR("Incorrect thread type %d", args->thread_type);
+ assert(0);
+ }
+#endif /* HAVE_PSI_INTERFACE */
- bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD
- ? key_wsrep_applier : key_wsrep_rollbacker,
+ bool res= mysql_thread_create(key,
&args->thread_id, &connection_attrib,
start_wsrep_THD, (void*)args);
@@ -561,6 +581,348 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
return res;
}
+static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ bf_seqno, victim_seqno);
+ abort();
+}
+
+static int wsrep_kill(wsrep_kill_t* item)
+{
+ bool signal= item->signal;
+ unsigned long long victim_trx_id= static_cast<unsigned long long>(item->victim_trx_id);
+ unsigned long long bf_trx_id= static_cast<unsigned long long>(item->bf_trx_id);
+
+ // Note that find_thread_by_id will acquire LOCK_thd_kill mutex
+ // for thd if it's found
+ THD* bf_thd= find_thread_by_id(item->bf_thd_id, false, false);
+
+ if (!bf_thd)
+ {
+ WSREP_ERROR("BF thread: %lu not found", item->bf_thd_id);
+ assert(0);
+ }
+
+ long long bf_seqno= wsrep_thd_trx_seqno(bf_thd);
+
+ WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s query: %s",
+ wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal",
+ bf_trx_id,
+ item->bf_thd_id,
+ bf_seqno,
+ wsrep_thd_query_state_str(bf_thd),
+ wsrep_thd_conflict_state_str(bf_thd),
+ wsrep_thd_query(bf_thd));
+
+ // Note that we need to release LOCK_thd_kill mutex from BF thread
+ // to obey safe mutex ordering of LOCK_thread_count -> LOCK_thd_data
+ // that both are taken on find_thread_by_id below
+ mysql_mutex_unlock(&bf_thd->LOCK_thd_kill);
+
+ THD* thd= find_thread_by_id(item->victim_thd_id, false, true);
+
+ if (!thd)
+ {
+ WSREP_DEBUG("Victim thread: %lu not found", item->victim_thd_id);
+ return(0);
+ }
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ unsigned long victim_thread= item->victim_thd_id;
+ long long victim_seqno= wsrep_thd_trx_seqno(thd);
+
+ WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s query: %s",
+ wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
+ victim_trx_id,
+ victim_thread,
+ victim_seqno,
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_query(thd));
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING)
+ {
+ WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu"
+ " thread: %lu",
+ victim_trx_id,
+ victim_thread);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ return(0);
+ }
+
+ if (wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ {
+ WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu "
+ ", thread: %lu exec_mode: %s",
+ victim_trx_id,
+ victim_thread,
+ wsrep_thd_exec_mode_str(thd));
+ }
+
+ switch (wsrep_thd_get_conflict_state(thd))
+ {
+ case NO_CONFLICT:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state",
+ victim_thread,
+ victim_trx_id);
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state",
+ victim_thread,
+ victim_trx_id);
+ wsrep_thd_awake(thd, signal);
+ return(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s",
+ victim_thread,
+ victim_trx_id,
+ wsrep_thd_conflict_state_str(thd));
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ return(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd))
+ {
+ case QUERY_COMMITTING:
+ {
+ enum wsrep_status rcode=WSREP_OK;
+
+ WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ }
+ else
+ {
+ wsrep_t *wsrep= get_wsrep();
+
+ rcode= wsrep->abort_pre_commit(wsrep, bf_seqno,
+ (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id);
+
+ switch (rcode)
+ {
+ case WSREP_WARNING:
+ {
+ WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ wsrep_thd_awake(thd, signal);
+ return(1);
+ break;
+ }
+ case WSREP_OK:
+ break;
+ default:
+ {
+ WSREP_ERROR("Victim cancel commit bad commit exit thread: "
+ "%lu trx: %llu rcode: %d ",
+ victim_thread,
+ victim_trx_id,
+ rcode);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first. */
+ abort();
+ break;
+ }
+ }
+ }
+
+ wsrep_thd_awake(thd, signal);
+ break;
+ }
+ case QUERY_EXEC:
+ {
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting */
+ WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu",
+ victim_thread, victim_trx_id);
+
+ bool wait_lock= item->wait_lock;
+
+ if (wait_lock)
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag",
+ victim_thread,
+ victim_trx_id);
+
+ wsrep_thd_awake(thd, signal);
+ }
+ else
+ {
+ /* Abort currently executing query */
+ WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ wsrep_thd_awake(thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave for thread: "
+ "%lu trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ }
+ }
+ break;
+ }
+ case QUERY_IDLE:
+ {
+ WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: "
+ "%llu bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ return(0);
+ }
+
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ wsrep_lock_rollback();
+
+ if (wsrep_aborting_thd_contains(thd))
+ {
+ WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
+ }
+ else
+ {
+ wsrep_aborting_thd_enqueue(thd);
+ WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort",
+ victim_thread,
+ victim_trx_id);
+ }
+
+ wsrep_unlock_rollback();
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+
+ break;
+ }
+ default:
+ {
+ WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s",
+ victim_thread,
+ victim_trx_id,
+ wsrep_thd_query_state_str(thd));
+
+ assert(0);
+ break;
+ }
+ }
+
+ return(0);
+}
+
+static void wsrep_process_kill(THD *thd)
+{
+ DBUG_ENTER("wsrep_process_kill");
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ WSREP_DEBUG("WSREP killer thread started");
+
+ while (thd->killed == NOT_KILLED)
+ {
+ thd_proc_info(thd, "wsrep killer idle");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_kill;
+ thd->mysys_var->current_cond= &COND_wsrep_kill;
+
+ mysql_cond_wait(&COND_wsrep_kill,&LOCK_wsrep_kill);
+
+ WSREP_DEBUG("WSREP killer thread wakes for signal");
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep killer active");
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ /* process all entries in the queue */
+ while (!wsrep_kill_list.empty())
+ {
+ wsrep_kill_t to_be_killed= wsrep_kill_list.front();
+ // Release list mutex while we kill one thread
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ wsrep_kill(&to_be_killed);
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+ wsrep_kill_list.pop_front();
+ }
+ }
+
+ assert(wsrep_kill_list.empty());
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ sql_print_information("WSREP: killer thread exiting");
+ DBUG_PRINT("wsrep",("wsrep killer thread exiting"));
+ DBUG_VOID_RETURN;
+}
+
+
+void wsrep_create_killer()
+{
+ wsrep_thread_args* arg;
+ if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
+ WSREP_ERROR("Can't allocate memory for wsrep background killer thread");
+ assert(0);
+ }
+
+ arg->thread_type = WSREP_KILLER_THREAD;
+ arg->processor = wsrep_process_kill;
+
+ if (create_wsrep_THD(arg, false)) {
+ WSREP_WARN("Can't create thread to manage wsrep background kill");
+ my_free(arg);
+ return;
+ }
+}
+
bool wsrep_create_appliers(long threads, bool thread_count_lock)
{
if (!wsrep_connected)
@@ -656,7 +1018,7 @@ static void wsrep_rollback_process(THD *thd)
mysql_mutex_unlock(&aborting->LOCK_thd_data);
- set_current_thd(aborting);
+ set_current_thd(aborting);
aborting->store_globals();
mysql_mutex_lock(&aborting->LOCK_thd_data);
@@ -666,7 +1028,7 @@ static void wsrep_rollback_process(THD *thd)
(longlong) aborting->real_id);
mysql_mutex_unlock(&aborting->LOCK_thd_data);
- set_current_thd(thd);
+ set_current_thd(thd);
thd->store_globals();
mysql_mutex_lock(&LOCK_wsrep_rollback);
diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h
index 10efcbefbf6..51700d31e63 100644
--- a/sql/wsrep_thd.h
+++ b/sql/wsrep_thd.h
@@ -29,6 +29,7 @@ void wsrep_replay_sp_transaction(THD* thd);
void wsrep_replay_transaction(THD *thd);
bool wsrep_create_appliers(long threads, bool thread_count_lock=false);
void wsrep_create_rollbacker();
+void wsrep_create_killer();
int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
my_bool signal);
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index f18dc565329..be3a55557e7 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -500,6 +500,7 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type)
if (wsrep_start_replication())
{
wsrep_create_rollbacker();
+ wsrep_create_killer();
WSREP_DEBUG("Cluster address update creating %ld applier threads running %lu",
wsrep_slave_threads, wsrep_running_applier_threads);
wsrep_create_appliers(wsrep_slave_threads);