summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lindström <jan.lindstrom@mariadb.com>2020-03-12 15:34:50 +0200
committerJan Lindström <jan.lindstrom@mariadb.com>2020-03-13 13:40:00 +0200
commitdd68a682aff2f17750abf9054e1facd2c238a182 (patch)
tree1936f8a6d1b331e4b54d85c0c0c2b25cb4ee8e80
parented21202a14e981a997061db12b2a377910fb02d5 (diff)
downloadmariadb-git-10.2-MDEV-21910.tar.gz
MDEV-21910 : KIlling thread on Galera could cause mutex deadlock10.2-MDEV-21910
Following issues here: Whenever Galera BF (brute force) transaction decides to abort conflicting transaction it will kill that thread using thd::awake() User KILL [QUERY|CONNECTION] ... for a thread it will also call thd::awake() Whenever one of these actions is executed we will hold number of InnoDB internal mutexes and thd mutexes. Sometimes these mutexes are taken in different order causing mutex deadlock. Lets consider a example starting from Galera BF transaction deciding to abort conflicting transaction (let's call this thread1): thread1: lock_rec_other_has_conflicting here we hold both lock_sys mutex and trx mutex for conflicting lock transaction. wsrep_innobase_kill_one_trx For victim we call wsrep_thd_awake Next thread2 we assume user to execute KILL QUERY to some other executing query (note it can't be BF query). thread2: sql_kill() kill_one_thread find_thread_by_id takes LOCK_thd_kill thd->awake_no_mutex() thread1: thd->awake(KILL_QUERY) Tries to have LOCK_thd_kill but it is hold by thread2 so we wait (note that we still hold lock_sys mutex and trx mutex). thread2: ha_kill_query() kill_handlerton innobase_kill_query lock_trx_handle_wait lock_mutex_enter() must wait as thread1 is holding it Thus thread1 lock_sys, trx_mutex waits -> thread2 LOCK_thd_kill waits lock_sys -> thread1 ==> thread1 waits -> thread2 waits -> thread1 ==> mutex deadlock. In this patch we will fix Galera BF and user kill cases so that we enqueue victim thread to a list while we hold InnoDB mutexes and we then release them. A new background thread will pick victim thread from this new list and uses thd::awake() with no InnoDB mutexes. Idea is similar to replication background kill. This fix enforces that we take LOCK_thd_data -> lock sys mutex -> trx mutex always in this order. wsrep_mysqld.cc Here we introduce a list where victim threads are stored, condition variable to be used to wake up background thread and mutex to protect list. wsrep_thd.cc Create a new background thread to handle victim thread abort. We may take wsrep_thd_LOCK mutex here but not any InnoDB mutexes. wsrep_innobase_kill_one_trx Remove all the wsrep code that was moved to wsrep_thd.cc We just enqueue required information to background kill list and cancel victim trx lock wait if there is such. Here we have InnoDB lock sys mutex and trx mutex so here we can't take wsrep_thd_LOCK mutex. wsrep_abort_transaction Cleanup only.
-rw-r--r--include/mysql/service_wsrep.h9
-rw-r--r--mysql-test/suite/galera/disabled.def1
-rw-r--r--mysql-test/suite/galera/r/galera_bf_kill.result49
-rw-r--r--mysql-test/suite/galera/r/galera_performance_schema.result3
-rw-r--r--mysql-test/suite/galera/r/galera_serializable.result8
-rw-r--r--mysql-test/suite/galera/t/MW-286.cnf10
-rw-r--r--mysql-test/suite/galera/t/MW-286.test2
-rw-r--r--mysql-test/suite/galera/t/galera_bf_kill.cnf10
-rw-r--r--mysql-test/suite/galera/t/galera_bf_kill.test93
-rw-r--r--mysql-test/suite/galera/t/galera_drop_database.cnf10
-rw-r--r--mysql-test/suite/galera/t/galera_serializable.test17
-rw-r--r--mysql-test/suite/wsrep/r/variables.result6
-rw-r--r--sql/sql_class.h2
-rw-r--r--sql/sql_plugin_services.ic4
-rw-r--r--sql/wsrep_dummy.cc6
-rw-r--r--sql/wsrep_mysqld.cc102
-rw-r--r--sql/wsrep_mysqld.h22
-rw-r--r--sql/wsrep_thd.cc355
-rw-r--r--sql/wsrep_thd.h1
-rw-r--r--sql/wsrep_var.cc1
-rw-r--r--storage/innobase/handler/ha_innodb.cc259
-rw-r--r--storage/innobase/include/ha_prototypes.h2
-rw-r--r--storage/innobase/lock/lock0lock.cc4
23 files changed, 719 insertions, 257 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h
index 923ba57fcdc..f398ccad699 100644
--- a/include/mysql/service_wsrep.h
+++ b/include/mysql/service_wsrep.h
@@ -70,6 +70,7 @@ struct xid_t;
struct wsrep;
struct wsrep_ws_handle;
struct wsrep_buf;
+typedef struct wsrep_kill wsrep_kill_t;
extern struct wsrep_service_st {
struct wsrep * (*get_wsrep_func)();
@@ -114,7 +115,9 @@ extern struct wsrep_service_st {
int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD);
void (*wsrep_unlock_rollback_func)();
void (*wsrep_set_data_home_dir_func)(const char *data_dir);
- my_bool (*wsrep_thd_is_applier_func)(MYSQL_THD);
+ my_bool (*wsrep_thd_is_applier_func)(MYSQL_THD thd);
+ bool (*wsrep_enqueue_background_kill_func)(wsrep_kill_t*);
+ wsrep_kill_t* (*wsrep_dequeue_background_kill_func)();
} *wsrep_service;
#ifdef MYSQL_DYNAMIC_PLUGIN
@@ -161,6 +164,8 @@ extern struct wsrep_service_st {
#define wsrep_unlock_rollback() wsrep_service->wsrep_unlock_rollback_func()
#define wsrep_set_data_home_dir(A) wsrep_service->wsrep_set_data_home_dir_func(A)
#define wsrep_thd_is_applier(T) wsrep_service->wsrep_thd_is_applier_func(T)
+#define wsrep_enqueue_background_kill(T) wsrep_service->wsrep_enqueue_background_kill_func(T);
+#define wsrep_dequeue_background_kill() wsrep_service->wsrep_dequeue_background_kill_func();
#define wsrep_debug get_wsrep_debug()
#define wsrep_log_conflicts get_wsrep_log_conflicts()
@@ -223,6 +228,8 @@ bool wsrep_thd_ignore_table(THD *thd);
void wsrep_unlock_rollback();
void wsrep_set_data_home_dir(const char *data_dir);
my_bool wsrep_thd_is_applier(MYSQL_THD thd);
+bool wsrep_enqueue_background_kill(wsrep_kill_t*);
+wsrep_kill_t* wsrep_dequeue_background_kill();
#endif
#ifdef __cplusplus
diff --git a/mysql-test/suite/galera/disabled.def b/mysql-test/suite/galera/disabled.def
index 27691c8f4b5..4a819acd6dd 100644
--- a/mysql-test/suite/galera/disabled.def
+++ b/mysql-test/suite/galera/disabled.def
@@ -10,7 +10,6 @@
#
##############################################################################
-MW-286 : MDEV-18464 Killing thread can cause mutex deadlock if done concurrently with Galera/replication victim kill
MW-328A : MDEV-21483 galera.MW-328A galera.MW-328B
MW-328B : MDEV-21483 galera.MW-328A galera.MW-328B
MW-329 : MDEV-19962 Galera test failure on MW-329
diff --git a/mysql-test/suite/galera/r/galera_bf_kill.result b/mysql-test/suite/galera/r/galera_bf_kill.result
new file mode 100644
index 00000000000..bd93b29b2db
--- /dev/null
+++ b/mysql-test/suite/galera/r/galera_bf_kill.result
@@ -0,0 +1,49 @@
+connection node_2;
+CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB;
+insert into t1 values (NULL,1);
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5;
+connection node_2;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5;
+connection node_2;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5, b=2;
+connection node_2;
+ALTER TABLE t1 ADD UNIQUE KEY(b);
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5, b=2;
+connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2b;
+begin;
+update t1 set a =6, b=7;
+connection node_2;
+ALTER TABLE t1 ADD UNIQUE KEY(b);
+Warnings:
+Note 1831 Duplicate index `b_2`. This is deprecated and will be disallowed in a future release
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+disconnect node_2b;
+drop table t1;
diff --git a/mysql-test/suite/galera/r/galera_performance_schema.result b/mysql-test/suite/galera/r/galera_performance_schema.result
index 5b4994556d6..22b72572759 100644
--- a/mysql-test/suite/galera/r/galera_performance_schema.result
+++ b/mysql-test/suite/galera/r/galera_performance_schema.result
@@ -4,6 +4,7 @@ FROM threads
WHERE name LIKE 'thread/sql/wsrep%'
ORDER BY name;
name thread/sql/wsrep_applier_thread
+name thread/sql/wsrep_killer_thread
name thread/sql/wsrep_rollbacker_thread
use test;
create table t1 (a int not null primary key) engine=innodb;
@@ -12,6 +13,7 @@ use performance_schema;
select name from mutex_instances where name like 'wait/synch/mutex/sql/LOCK_wsrep%' order by name;
name wait/synch/mutex/sql/LOCK_wsrep_config_state
name wait/synch/mutex/sql/LOCK_wsrep_desync
+name wait/synch/mutex/sql/LOCK_wsrep_kill
name wait/synch/mutex/sql/LOCK_wsrep_ready
name wait/synch/mutex/sql/LOCK_wsrep_replaying
name wait/synch/mutex/sql/LOCK_wsrep_rollback
@@ -19,6 +21,7 @@ name wait/synch/mutex/sql/LOCK_wsrep_slave_threads
name wait/synch/mutex/sql/LOCK_wsrep_sst
name wait/synch/mutex/sql/LOCK_wsrep_sst_init
select name from cond_instances where name like 'wait/synch/cond/sql/COND_wsrep%' order by name;
+name wait/synch/cond/sql/COND_wsrep_kill
name wait/synch/cond/sql/COND_wsrep_ready
name wait/synch/cond/sql/COND_wsrep_replaying
name wait/synch/cond/sql/COND_wsrep_rollback
diff --git a/mysql-test/suite/galera/r/galera_serializable.result b/mysql-test/suite/galera/r/galera_serializable.result
index be3f93a081f..9280578b58a 100644
--- a/mysql-test/suite/galera/r/galera_serializable.result
+++ b/mysql-test/suite/galera/r/galera_serializable.result
@@ -27,11 +27,17 @@ ROLLBACK;
DELETE FROM t1;
connection node_1;
START TRANSACTION;
-connection node_1;
INSERT INTO t1 VALUES (1,1);
connection node_2;
INSERT INTO t1 VALUES (1,2);
connection node_1;
COMMIT;
ERROR 40001: Deadlock: wsrep aborted transaction
+SELECT * from t1;
+id f2
+1 2
+connection node_2;
+SELECT * from t1;
+id f2
+1 2
DROP TABLE t1;
diff --git a/mysql-test/suite/galera/t/MW-286.cnf b/mysql-test/suite/galera/t/MW-286.cnf
new file mode 100644
index 00000000000..8ec4c2333cb
--- /dev/null
+++ b/mysql-test/suite/galera/t/MW-286.cnf
@@ -0,0 +1,10 @@
+!include ../galera_2nodes.cnf
+
+[mysqld.1]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
+[mysqld.2]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
diff --git a/mysql-test/suite/galera/t/MW-286.test b/mysql-test/suite/galera/t/MW-286.test
index 426b4493bb7..fcfbf4c62c3 100644
--- a/mysql-test/suite/galera/t/MW-286.test
+++ b/mysql-test/suite/galera/t/MW-286.test
@@ -3,8 +3,6 @@
#
--source include/galera_cluster.inc
---source include/have_innodb.inc
---source include/big_test.inc
--connection node_1
CREATE TABLE ten (f1 INTEGER) Engine=InnoDB;
diff --git a/mysql-test/suite/galera/t/galera_bf_kill.cnf b/mysql-test/suite/galera/t/galera_bf_kill.cnf
new file mode 100644
index 00000000000..8ec4c2333cb
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_bf_kill.cnf
@@ -0,0 +1,10 @@
+!include ../galera_2nodes.cnf
+
+[mysqld.1]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
+[mysqld.2]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
diff --git a/mysql-test/suite/galera/t/galera_bf_kill.test b/mysql-test/suite/galera/t/galera_bf_kill.test
new file mode 100644
index 00000000000..dce6808fd56
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_bf_kill.test
@@ -0,0 +1,93 @@
+--source include/galera_cluster.inc
+
+#
+# Test case 1: Start a transaction on node_2a and kill it
+# from other connection on same node
+#
+
+--connection node_2
+CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB;
+insert into t1 values (NULL,1);
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5;
+
+--connection node_2
+
+#show create table information_schema.processlist;
+#select ID,USER,COMMAND,STATE,QUERY_ID from INFORMATION_SCHEMA.PROCESSLIST;
+
+--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1`
+
+--disable_query_log
+--eval KILL $k_thread
+--enable_query_log
+
+select * from t1;
+--disconnect node_2a
+
+#
+# Test case 2: Start a transaction on node_2a and use
+# kill query from other connection on same node
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5;
+
+--connection node_2
+--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1`
+
+--disable_query_log
+--eval KILL QUERY $k_thread
+--enable_query_log
+
+select * from t1;
+--disconnect node_2a
+#
+# Test case 3: Start a transaction on node_2a and start a DDL on other transaction
+# that will then abort node_2a transaction
+#
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5, b=2;
+
+--connection node_2
+ALTER TABLE t1 ADD UNIQUE KEY(b);
+
+select * from t1;
+
+--disconnect node_2a
+
+#
+# Test case 4: Start a transaction on node_2a and conflicting transaction on node_2b
+# and start a DDL on other transaction that will then abort node_2a and node_2b
+# transactions
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5, b=2;
+
+--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2b
+begin;
+send update t1 set a =6, b=7;
+
+--connection node_2
+ALTER TABLE t1 ADD UNIQUE KEY(b);
+
+select * from t1;
+
+--disconnect node_2a
+--disconnect node_2b
+
+drop table t1;
+
+
+
diff --git a/mysql-test/suite/galera/t/galera_drop_database.cnf b/mysql-test/suite/galera/t/galera_drop_database.cnf
new file mode 100644
index 00000000000..8ec4c2333cb
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_drop_database.cnf
@@ -0,0 +1,10 @@
+!include ../galera_2nodes.cnf
+
+[mysqld.1]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
+[mysqld.2]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
diff --git a/mysql-test/suite/galera/t/galera_serializable.test b/mysql-test/suite/galera/t/galera_serializable.test
index b12d57fd488..28f6a7aa62d 100644
--- a/mysql-test/suite/galera/t/galera_serializable.test
+++ b/mysql-test/suite/galera/t/galera_serializable.test
@@ -7,7 +7,6 @@
#
--source include/galera_cluster.inc
---source include/have_innodb.inc
--connection node_1
@@ -26,7 +25,6 @@ SELECT * FROM t1;
--connection node_2
INSERT INTO t1 VALUES (1,1);
---sleep 2
--connection node_1
--error ER_LOCK_DEADLOCK
SELECT * FROM t1;
@@ -46,7 +44,6 @@ SELECT * FROM t1;
--connection node_2
UPDATE t1 SET f2 = 2;
---sleep 2
--connection node_1
--error ER_LOCK_DEADLOCK
UPDATE t1 SET f2 = 3;
@@ -62,8 +59,6 @@ DELETE FROM t1;
--connection node_1
START TRANSACTION;
-
---connection node_1
INSERT INTO t1 VALUES (1,1);
--connection node_2
@@ -73,4 +68,16 @@ INSERT INTO t1 VALUES (1,2);
--error ER_LOCK_DEADLOCK
COMMIT;
+--let $wait_condition = SELECT COUNT(*) = 1 FROM t1;
+--source include/wait_condition.inc
+
+SELECT * from t1;
+
+--connection node_2
+
+--let $wait_condition = SELECT COUNT(*) = 1 FROM t1;
+--source include/wait_condition.inc
+
+SELECT * from t1;
+
DROP TABLE t1;
diff --git a/mysql-test/suite/wsrep/r/variables.result b/mysql-test/suite/wsrep/r/variables.result
index 3abc861f3d0..96dcee47206 100644
--- a/mysql-test/suite/wsrep/r/variables.result
+++ b/mysql-test/suite/wsrep/r/variables.result
@@ -149,7 +149,7 @@ VARIABLE_VALUE
1
SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_thread_count';
VARIABLE_VALUE
-2
+3
SELECT @@global.wsrep_provider;
@@global.wsrep_provider
libgalera_smm.so
@@ -164,7 +164,7 @@ Variable_name Value
Threads_connected 1
SHOW STATUS LIKE 'wsrep_thread_count';
Variable_name Value
-wsrep_thread_count 2
+wsrep_thread_count 3
SET @wsrep_slave_threads_saved= @@global.wsrep_slave_threads;
SET GLOBAL wsrep_slave_threads= 10;
@@ -177,7 +177,7 @@ VARIABLE_VALUE
1
SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_thread_count';
VARIABLE_VALUE
-11
+12
SHOW STATUS LIKE 'threads_connected';
Variable_name Value
Threads_connected 1
diff --git a/sql/sql_class.h b/sql/sql_class.h
index b35f9a93238..95f08acf403 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4409,6 +4409,8 @@ public:
#ifdef WITH_WSREP
const bool wsrep_applier; /* dedicated slave applier thread */
+ bool wsrep_killer; /* dedicated background
+ kill thread */
bool wsrep_applier_closing; /* applier marked to close */
bool wsrep_client_thread; /* to identify client threads*/
bool wsrep_PA_safe;
diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic
index 20113444b64..7cae8a79efd 100644
--- a/sql/sql_plugin_services.ic
+++ b/sql/sql_plugin_services.ic
@@ -184,7 +184,9 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_trx_order_before,
wsrep_unlock_rollback,
wsrep_set_data_home_dir,
- wsrep_thd_is_applier
+ wsrep_thd_is_applier,
+ wsrep_enqueue_background_kill,
+ wsrep_dequeue_background_kill
};
static struct thd_specifics_service_st thd_specifics_handler=
diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc
index b98dd1e5790..a0e720e5dd0 100644
--- a/sql/wsrep_dummy.cc
+++ b/sql/wsrep_dummy.cc
@@ -150,3 +150,9 @@ void wsrep_set_data_home_dir(const char *)
my_bool wsrep_thd_is_applier(MYSQL_THD thd)
{ return false; }
+
+bool wsrep_enqueue_background_kill(wsrep_kill_t* item)
+{ return false;}
+
+wsrep_kill_t* wsrep_dequeue_background_kill()
+{ return NULL;}
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index ba6c2d24f77..54dda0a1caf 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -38,7 +38,8 @@
#include <cstdlib>
#include "log_event.h"
#include "sql_plugin.h" /* wsrep_plugins_pre_init() */
-#include <vector>
+#include <list>
+#include <algorithm>
wsrep_t *wsrep = NULL;
/*
@@ -133,6 +134,8 @@ mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
+mysql_mutex_t LOCK_wsrep_kill;
+mysql_cond_t COND_wsrep_kill;
int wsrep_replaying= 0;
ulong wsrep_running_threads = 0; // # of currently running wsrep
@@ -140,6 +143,7 @@ ulong wsrep_running_threads = 0; // # of currently running wsrep
ulong wsrep_running_applier_threads = 0; // # of running applier threads
ulong wsrep_running_rollbacker_threads = 0; // # of running
// # rollbacker threads
+ulong wsrep_running_killer_threads = 0;
ulong my_bind_addr;
#ifdef HAVE_PSI_INTERFACE
@@ -147,11 +151,13 @@ PSI_mutex_key key_LOCK_wsrep_rollback,
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
- key_LOCK_wsrep_config_state;
+ key_LOCK_wsrep_config_state,
+ key_LOCK_wsrep_kill;
PSI_cond_key key_COND_wsrep_rollback,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
- key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
+ key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
+ key_COND_wsrep_kill;
PSI_file_key key_file_wsrep_gra_log;
@@ -166,7 +172,8 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
- { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
+ { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_kill, "LOCK_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_cond_info wsrep_conds[]=
@@ -176,7 +183,8 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
- { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
+ { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_kill, "COND_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_file_info wsrep_files[]=
@@ -185,14 +193,15 @@ static PSI_file_info wsrep_files[]=
};
PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
- key_wsrep_rollbacker, key_wsrep_applier;
+ key_wsrep_rollbacker, key_wsrep_applier, key_wsrep_killer;
static PSI_thread_info wsrep_threads[]=
{
{&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
- {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}
+ {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL},
+ {&key_wsrep_killer, "wsrep_killer_thread", PSI_FLAG_GLOBAL}
};
#endif /* HAVE_PSI_INTERFACE */
@@ -239,6 +248,7 @@ wsp::Config_state *wsrep_config_state;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
+std::list< wsrep_kill_t* > wsrep_kill_list;
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
@@ -820,6 +830,8 @@ void wsrep_thr_init()
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_kill, &LOCK_wsrep_kill, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_kill, &COND_wsrep_kill, NULL);
DBUG_VOID_RETURN;
}
@@ -856,6 +868,7 @@ void wsrep_init_startup (bool first)
if (!wsrep_start_replication()) unireg_abort(1);
wsrep_create_rollbacker();
+ wsrep_create_killer();
wsrep_create_appliers(1);
if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
@@ -897,6 +910,8 @@ void wsrep_thr_deinit()
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_desync);
mysql_mutex_destroy(&LOCK_wsrep_config_state);
+ mysql_mutex_destroy(&LOCK_wsrep_kill);
+ mysql_cond_destroy(&COND_wsrep_kill);
delete wsrep_config_state;
wsrep_config_state= 0; // Safety
}
@@ -1648,7 +1663,7 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
{
- WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
+ WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd));
return 1;
}
@@ -2144,9 +2159,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
case WSREP_ROLLBACKER_THREAD:
wsrep_running_rollbacker_threads++;
break;
+ case WSREP_KILLER_THREAD:
+ wsrep_running_killer_threads++;
+ thd->wsrep_killer= true;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
mysql_cond_broadcast(&COND_thread_count);
@@ -2169,9 +2188,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
wsrep_running_rollbacker_threads--;
break;
+ case WSREP_KILLER_THREAD:
+ DBUG_ASSERT(wsrep_running_killer_threads > 0);
+ wsrep_running_killer_threads--;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
my_free(args);
@@ -2418,7 +2441,11 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
}
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
- WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for rollbacker threads to close: %lu", wsrep_running_rollbacker_threads);
+ WSREP_DEBUG("Waiting for applier threads to close: %lu", wsrep_running_applier_threads);
+ WSREP_DEBUG("Waiting for killer threads to close: %lu", wsrep_running_killer_threads);
+ WSREP_DEBUG("Waiting for wsrep threads to close: %lu", wsrep_running_threads);
while (wait_to_end && have_client_connections())
{
@@ -2452,7 +2479,7 @@ void wsrep_close_threads(THD *thd)
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
- if (tmp->wsrep_applier && tmp != thd)
+ if ((tmp->wsrep_applier || tmp->wsrep_killer) && tmp != thd)
{
WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
wsrep_close_thread (tmp);
@@ -2466,7 +2493,7 @@ void wsrep_wait_appliers_close(THD *thd)
{
/* Wait for wsrep appliers to gracefully exit */
mysql_mutex_lock(&LOCK_thread_count);
- while (wsrep_running_threads > 1)
+ while (wsrep_running_threads > 2)
// 1 is for rollbacker thread which needs to be killed explicitly.
// This gotta be fixed in a more elegant manner if we gonna have arbitrary
// number of non-applier wsrep threads.
@@ -2995,3 +3022,52 @@ bool wsrep_node_is_synced()
{
return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
}
+
+bool wsrep_enqueue_background_kill(wsrep_kill_t *item)
+{
+ std::list< wsrep_kill_t* >::iterator it;
+ bool inserted= false;
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ for (it = wsrep_kill_list.begin(); it != wsrep_kill_list.end(); it++)
+ {
+ if ((*it)->victim_thd == item->victim_thd)
+ break;
+ }
+
+ if(it != wsrep_kill_list.end())
+ {
+ WSREP_DEBUG("Thread: %lld query: %s already on kill list",
+ item->victim_thd->thread_id, wsrep_thd_query(item->victim_thd));
+ }
+ else
+ {
+ wsrep_kill_list.push_back(item);
+ mysql_cond_signal(&COND_wsrep_kill);
+ inserted= true;
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ return inserted;
+}
+
+wsrep_kill_t* wsrep_dequeue_background_kill()
+{
+ wsrep_kill_t* item= NULL;
+
+ mysql_mutex_assert_owner(&LOCK_wsrep_kill);
+
+ if (!wsrep_kill_list.empty())
+ {
+ item= wsrep_kill_list.front();
+ wsrep_kill_list.pop_front();
+ }
+
+ return item;
+}
+
+bool wsrep_kill_empty()
+{
+ return wsrep_kill_list.empty();
+}
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 96b3e63914b..8258bc32f6b 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -56,6 +56,15 @@ struct wsrep_thd_shadow {
longlong row_count_func;
};
+typedef struct wsrep_kill {
+ THD* victim_thd;
+ THD* bf_thd;
+ uint64_t victim_id;
+ uint64_t bf_id;
+ bool signal;
+ bool wait_lock;
+} wsrep_kill_t;
+
// Global wsrep parameters
extern wsrep_t* wsrep;
@@ -92,6 +101,7 @@ extern my_bool wsrep_slave_UK_checks;
extern ulong wsrep_running_threads;
extern ulong wsrep_running_applier_threads;
extern ulong wsrep_running_rollbacker_threads;
+extern ulong wsrep_running_killer_threads;
extern bool wsrep_new_cluster;
extern bool wsrep_gtid_mode;
extern uint32 wsrep_gtid_domain_id;
@@ -234,8 +244,6 @@ extern wsrep_seqno_t wsrep_locked_seqno;
#define WSREP_PROVIDER_EXISTS \
(wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN))
-#define WSREP_QUERY(thd) (thd->query())
-
extern my_bool wsrep_ready_get();
extern void wsrep_ready_wait();
@@ -265,6 +273,8 @@ extern mysql_cond_t COND_wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_desync;
extern mysql_mutex_t LOCK_wsrep_config_state;
+extern mysql_mutex_t LOCK_wsrep_kill;
+extern mysql_cond_t COND_wsrep_kill;
extern wsrep_aborting_thd_t wsrep_aborting_thd;
extern my_bool wsrep_emulate_bin_log;
extern int wsrep_to_isolation;
@@ -289,6 +299,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying;
extern PSI_cond_key key_COND_wsrep_replaying;
extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
extern PSI_mutex_key key_LOCK_wsrep_desync;
+extern PSI_mutex_key key_LOCK_wsrep_kill;
+extern PSI_cond_key key_COND_wsrep_kill;
extern PSI_file_key key_file_wsrep_gra_log;
@@ -296,6 +308,7 @@ extern PSI_thread_key key_wsrep_sst_joiner;
extern PSI_thread_key key_wsrep_sst_donor;
extern PSI_thread_key key_wsrep_rollbacker;
extern PSI_thread_key key_wsrep_applier;
+extern PSI_thread_key key_wsrep_killer;
#endif /* HAVE_PSI_INTERFACE */
@@ -322,7 +335,8 @@ void thd_binlog_trx_reset(THD * thd);
enum wsrep_thread_type {
WSREP_APPLIER_THREAD=1,
- WSREP_ROLLBACKER_THREAD=2
+ WSREP_ROLLBACKER_THREAD=2,
+ WSREP_KILLER_THREAD=3
};
typedef void (*wsrep_thd_processor_fun)(THD *);
@@ -369,6 +383,8 @@ void wsrep_keys_free(wsrep_key_arr_t* key_arr);
((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \
wsrep_forced_binlog_format : my_format)
+bool wsrep_kill_empty();
+
#else /* WITH_WSREP */
#define WSREP(T) (0)
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index a7b6e8ff1b1..cff92cd3484 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -27,6 +27,7 @@
#include "rpl_filter.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
+#include "debug_sync.h"
#if (__LP64__)
static volatile int64 wsrep_bf_aborts_counter(0);
@@ -424,12 +425,25 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
mysql_mutex_lock(&LOCK_thread_count);
ulong old_wsrep_running_threads= wsrep_running_threads;
+ PSI_thread_key key;
- DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD ||
- args->thread_type == WSREP_ROLLBACKER_THREAD);
+ switch(args->thread_type)
+ {
+ case WSREP_APPLIER_THREAD:
+ key = key_wsrep_applier;
+ break;
+ case WSREP_ROLLBACKER_THREAD:
+ key = key_wsrep_rollbacker;
+ break;
+ case WSREP_KILLER_THREAD:
+ key = key_wsrep_killer;
+ break;
+ default:
+ WSREP_ERROR("Incorrect thread type %d", args->thread_type);
+ assert(0);
+ }
- bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD
- ? key_wsrep_applier : key_wsrep_rollbacker,
+ bool res= mysql_thread_create(key,
&args->thread_id, &connection_attrib,
start_wsrep_THD, (void*)args);
@@ -457,6 +471,335 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
return res;
}
+static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ bf_seqno, victim_seqno);
+ abort();
+}
+
+static int wsrep_kill(wsrep_kill_t* item)
+{
+ bool signal= item->signal;
+ THD* thd= item->victim_thd;
+ THD* bf_thd= item->bf_thd;
+ unsigned long long victim_id= static_cast<unsigned long long>(item->victim_id);
+ unsigned long long bf_id= static_cast<unsigned long long>(item->bf_id);
+ unsigned long victim_thread= thd_get_thread_id(thd);
+ unsigned long bf_thread= thd_get_thread_id(bf_thd);
+ long long bf_seqno= wsrep_thd_trx_seqno(bf_thd);
+ long long victim_seqno= wsrep_thd_trx_seqno(thd);
+
+ wsrep_thd_LOCK(thd);
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s query: %s",
+ wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal",
+ bf_id,
+ bf_thread,
+ bf_seqno,
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_query(bf_thd));
+
+ WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s query: %s",
+ wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
+ victim_id,
+ victim_thread,
+ victim_seqno,
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_query(thd));
+
+ DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock",
+ {
+ const char act[]=
+ "now "
+ "wait_for signal.wsrep_after_BF_victim_lock";
+ DBUG_ASSERT(!debug_sync_set_action(bf_thd,
+ STRING_WITH_LEN(act)));
+ };);
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING)
+ {
+ WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu"
+ " thread: %lu",
+ victim_id,
+ victim_thread);
+ wsrep_thd_UNLOCK(thd);
+ return(0);
+ }
+
+ if (wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ {
+ WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu "
+ ", thread: %lu exec_mode: %s",
+ victim_id,
+ victim_thread,
+ wsrep_thd_exec_mode_str(thd));
+ }
+
+ switch (wsrep_thd_get_conflict_state(thd))
+ {
+ case NO_CONFLICT:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state",
+ victim_thread,
+ victim_id);
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state",
+ victim_thread,
+ victim_id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ return(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s",
+ victim_thread,
+ victim_id,
+ wsrep_thd_conflict_state_str(thd));
+ wsrep_thd_UNLOCK(thd);
+ return(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd))
+ {
+ case QUERY_COMMITTING:
+ {
+ enum wsrep_status rcode=WSREP_OK;
+
+ WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu",
+ victim_thread,
+ victim_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ }
+ else
+ {
+ wsrep_t *wsrep= get_wsrep();
+
+ rcode= wsrep->abort_pre_commit(wsrep, bf_seqno,
+ (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id);
+
+ switch (rcode)
+ {
+ case WSREP_WARNING:
+ {
+ WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu",
+ victim_thread,
+ victim_id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ return(1);
+ break;
+ }
+ case WSREP_OK:
+ break;
+ default:
+ {
+ WSREP_ERROR("Victim cancel commit bad commit exit thread: "
+ "%lu trx: %llu rcode: %d ",
+ victim_thread,
+ victim_id,
+ rcode);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first. */
+ abort();
+ break;
+ }
+ }
+ }
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ break;
+ }
+ case QUERY_EXEC:
+ {
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting */
+ WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu",
+ victim_thread, victim_id);
+
+ bool wait_lock= item->wait_lock;
+
+ if (wait_lock)
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag",
+ victim_thread,
+ victim_id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ }
+ else
+ {
+ /* Abort currently executing query */
+ WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu",
+ victim_thread,
+ victim_id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave for thread: "
+ "%lu trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ }
+ }
+ break;
+ }
+ case QUERY_IDLE:
+ {
+ WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu",
+ victim_thread,
+ victim_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: "
+ "%llu bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ return(0);
+ }
+
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ wsrep_lock_rollback();
+
+ if (wsrep_aborting_thd_contains(thd))
+ {
+ WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu",
+ victim_thread,
+ victim_id);
+ }
+ else
+ {
+ wsrep_aborting_thd_enqueue(thd);
+ WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort",
+ victim_thread,
+ victim_id);
+ }
+
+ wsrep_unlock_rollback();
+ wsrep_thd_UNLOCK(thd);
+
+ break;
+ }
+ default:
+ {
+ WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s",
+ victim_thread,
+ victim_id,
+ wsrep_thd_query_state_str(thd));
+ wsrep_thd_UNLOCK(thd);
+ break;
+ }
+ }
+
+ return(0);
+}
+
+static void wsrep_process_kill(THD *thd)
+{
+ DBUG_ENTER("wsrep_process_kill");
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ WSREP_DEBUG("WSREP killer thread started");
+
+ while (thd->killed == NOT_KILLED)
+ {
+ wsrep_kill_t* to_be_killed;
+
+ thd_proc_info(thd, "wsrep killer idle");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_kill;
+ thd->mysys_var->current_cond= &COND_wsrep_kill;
+
+ mysql_cond_wait(&COND_wsrep_kill,&LOCK_wsrep_kill);
+
+ WSREP_DEBUG("WSREP killer thread wakes for signal");
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep killer active");
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ /* process all entries in the queue */
+ while ((to_be_killed= wsrep_dequeue_background_kill()))
+ {
+ /* Here we do not have InnoDB lock sys mutex or trx mutex
+ so it is safe to take LOCK_thd_data. */
+ wsrep_kill(to_be_killed);
+ my_free(to_be_killed);
+ to_be_killed= NULL;
+ }
+ }
+
+ assert(wsrep_kill_empty());
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ sql_print_information("WSREP: killer thread exiting");
+ DBUG_PRINT("wsrep",("wsrep killer thread exiting"));
+ DBUG_VOID_RETURN;
+}
+
+
+void wsrep_create_killer()
+{
+ wsrep_thread_args* arg;
+ if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
+ WSREP_ERROR("Can't allocate memory for wsrep background killer thread");
+ assert(0);
+ }
+
+ arg->thread_type = WSREP_KILLER_THREAD;
+ arg->processor = wsrep_process_kill;
+
+ if (create_wsrep_THD(arg, false)) {
+ WSREP_WARN("Can't create thread to manage wsrep background kill");
+ my_free(arg);
+ return;
+ }
+}
+
bool wsrep_create_appliers(long threads, bool thread_count_lock)
{
if (!wsrep_connected)
@@ -552,7 +895,7 @@ static void wsrep_rollback_process(THD *thd)
mysql_mutex_unlock(&aborting->LOCK_thd_data);
- set_current_thd(aborting);
+ set_current_thd(aborting);
aborting->store_globals();
mysql_mutex_lock(&aborting->LOCK_thd_data);
@@ -562,7 +905,7 @@ static void wsrep_rollback_process(THD *thd)
(longlong) aborting->real_id);
mysql_mutex_unlock(&aborting->LOCK_thd_data);
- set_current_thd(thd);
+ set_current_thd(thd);
thd->store_globals();
mysql_mutex_lock(&LOCK_wsrep_rollback);
diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h
index 8d928014518..40ce4b847e6 100644
--- a/sql/wsrep_thd.h
+++ b/sql/wsrep_thd.h
@@ -28,6 +28,7 @@ void wsrep_client_rollback(THD *thd);
void wsrep_replay_transaction(THD *thd);
bool wsrep_create_appliers(long threads, bool thread_count_lock=false);
void wsrep_create_rollbacker();
+void wsrep_create_killer();
int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
my_bool signal);
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index 9777cc6ec62..8b2b7c13cdc 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -500,6 +500,7 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type)
if (wsrep_start_replication())
{
wsrep_create_rollbacker();
+ wsrep_create_killer();
WSREP_DEBUG("Cluster address update creating %ld applier threads running %lu",
wsrep_slave_threads, wsrep_running_applier_threads);
wsrep_create_appliers(wsrep_slave_threads);
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 0338e2b682a..659ce188641 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -5238,8 +5238,6 @@ static void innobase_kill_query(handlerton*, THD* thd, enum thd_kill_levels)
/* if victim has been signaled by BF thread and/or aborting
is already progressing, following query aborting is not necessary
any more.
- Also, BF thread should own trx mutex for the victim, which would
- conflict with trx_mutex_enter() below
*/
DBUG_VOID_RETURN;
}
@@ -19611,41 +19609,28 @@ static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
#ifdef WITH_WSREP
-void
-wsrep_abort_slave_trx(
-/*==================*/
- wsrep_seqno_t bf_seqno,
- wsrep_seqno_t victim_seqno)
-{
- WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
- "caused by:\n\t"
- "1) unsupported configuration options combination, please check documentation.\n\t"
- "2) a bug in the code.\n\t"
- "3) a database corruption.\n Node consistency compromized, "
- "need to abort. Restart the node to resync with cluster.",
- (long long)bf_seqno, (long long)victim_seqno);
- abort();
-}
-/*******************************************************************//**
-This function is used to kill one transaction in BF. */
+/** This function is used to kill one transaction in BF.
+@param[in] bf_thd_ptr BF THD
+@param[in] bf_trx BF transaction
+@param[in] victim_trx Victim to be killed
+@param[in] signal signal victim? */
UNIV_INTERN
int
wsrep_innobase_kill_one_trx(
-/*========================*/
- void * const bf_thd_ptr,
- const trx_t * const bf_trx,
- trx_t *victim_trx,
- ibool signal)
+ void* const bf_thd_ptr,
+ const trx_t* const bf_trx,
+ trx_t* victim_trx,
+ bool signal)
{
ut_ad(lock_mutex_own());
ut_ad(trx_mutex_own(victim_trx));
- ut_ad(bf_thd_ptr);
- ut_ad(victim_trx);
+ ut_a(bf_thd_ptr);
+ ut_a(victim_trx);
DBUG_ENTER("wsrep_innobase_kill_one_trx");
- THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
+
+ THD *bf_thd = (THD *) bf_thd_ptr;
THD *thd = (THD *) victim_trx->mysql_thd;
- int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
if (!thd) {
DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
@@ -19653,198 +19638,31 @@ wsrep_innobase_kill_one_trx(
DBUG_RETURN(1);
}
- if (!bf_thd) {
- DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
- WSREP_WARN("no BF THD for trx: " TRX_ID_FMT,
- bf_trx ? bf_trx->id : 0);
- DBUG_RETURN(1);
- }
-
- WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
-
- WSREP_DEBUG("BF kill (" ULINTPF ", seqno: " INT64PF
- "), victim: (%lu) trx: " TRX_ID_FMT,
- signal, bf_seqno,
- thd_get_thread_id(thd),
- victim_trx->id);
-
- WSREP_DEBUG("Aborting query: %s conf %d trx: %" PRId64,
- (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void",
- wsrep_thd_conflict_state(thd, FALSE),
- wsrep_thd_ws_handle(thd)->trx_id);
+ /* Actual processing of the victim kill is handled later
+ on background thread. At this point we may not hold
+ LOCK_thd_data mutex as we are already holding lock sys
+ and trx mutex. */
+ wsrep_kill_t* item = static_cast<wsrep_kill_t*>
+ (my_malloc(sizeof(wsrep_kill_t), MYF(0)));
- wsrep_thd_LOCK(thd);
- DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock",
- {
- const char act[]=
- "now "
- "wait_for signal.wsrep_after_BF_victim_lock";
- DBUG_ASSERT(!debug_sync_set_action(bf_thd,
- STRING_WITH_LEN(act)));
- };);
-
-
- if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
- WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT,
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
- DBUG_RETURN(0);
+ if (!item) {
+ WSREP_ERROR("Failed to allocate memory to kill thread.");
+ DBUG_RETURN(1);
}
- if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
- WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d",
- victim_trx->id,
- wsrep_thd_get_conflict_state(thd));
- }
+ item->victim_thd = thd;
+ item->victim_id = victim_trx->id;
+ item->bf_thd = bf_thd;
+ item->bf_id = bf_trx ? bf_trx->id : TRX_ID_MAX;
+ item->signal = signal;
+ item->wait_lock = (victim_trx->lock.wait_lock ? true : false);
- switch (wsrep_thd_get_conflict_state(thd)) {
- case NO_CONFLICT:
- wsrep_thd_set_conflict_state(thd, MUST_ABORT);
- break;
- case MUST_ABORT:
- WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state",
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
- DBUG_RETURN(0);
- break;
- case ABORTED:
- case ABORTING: // fall through
- default:
- WSREP_DEBUG("victim " TRX_ID_FMT " in state %d",
- victim_trx->id, wsrep_thd_get_conflict_state(thd));
- wsrep_thd_UNLOCK(thd);
- DBUG_RETURN(0);
- break;
+ /* If victim itself is waiting a lock, cancel wait lock. */
+ if (victim_trx->lock.wait_lock) {
+ lock_cancel_waiting_and_release(victim_trx->lock.wait_lock);
}
- switch (wsrep_thd_query_state(thd)) {
- case QUERY_COMMITTING:
- enum wsrep_status rcode;
-
- WSREP_DEBUG("kill query for: %ld",
- thd_get_thread_id(thd));
- WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT,
- victim_trx->id);
-
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
- } else {
- wsrep_t *wsrep= get_wsrep();
- rcode = wsrep->abort_pre_commit(
- wsrep, bf_seqno,
- (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id
- );
-
- switch (rcode) {
- case WSREP_WARNING:
- WSREP_DEBUG("cancel commit warning: "
- TRX_ID_FMT,
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
- DBUG_RETURN(1);
- break;
- case WSREP_OK:
- break;
- default:
- WSREP_ERROR(
- "cancel commit bad exit: %d "
- TRX_ID_FMT,
- rcode, victim_trx->id);
- /* unable to interrupt, must abort */
- /* note: kill_mysql() will block, if we cannot.
- * kill the lock holder first.
- */
- abort();
- break;
- }
- }
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
- break;
- case QUERY_EXEC:
- /* it is possible that victim trx is itself waiting for some
- * other lock. We need to cancel this waiting
- */
- WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT,
- victim_trx->id);
-
- victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
-
- if (victim_trx->lock.wait_lock) {
- WSREP_DEBUG("victim has wait flag: %ld",
- thd_get_thread_id(thd));
- lock_t* wait_lock = victim_trx->lock.wait_lock;
-
- if (wait_lock) {
- WSREP_DEBUG("canceling wait lock");
- victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
- lock_cancel_waiting_and_release(wait_lock);
- }
-
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
- } else {
- /* abort currently executing query */
- DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu",
- thd_get_thread_id(thd)));
- WSREP_DEBUG("kill query for: %ld",
- thd_get_thread_id(thd));
- /* Note that innobase_kill_query will take lock_mutex
- and trx_mutex */
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
-
- /* for BF thd, we need to prevent him from committing */
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
- }
- }
- break;
- case QUERY_IDLE:
- {
- WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id);
-
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- WSREP_DEBUG("kill BF IDLE, seqno: %lld",
- (long long)wsrep_thd_trx_seqno(thd));
- wsrep_thd_UNLOCK(thd);
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
- DBUG_RETURN(0);
- }
- /* This will lock thd from proceeding after net_read() */
- wsrep_thd_set_conflict_state(thd, ABORTING);
-
- wsrep_lock_rollback();
-
- if (wsrep_aborting_thd_contains(thd)) {
- WSREP_WARN("duplicate thd aborter %lu",
- (ulong) thd_get_thread_id(thd));
- } else {
- wsrep_aborting_thd_enqueue(thd);
- DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
- thd_get_thread_id(thd)));
- WSREP_DEBUG("enqueuing trx abort for (%lu)",
- thd_get_thread_id(thd));
- }
-
- DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
- WSREP_DEBUG("signaling aborter");
- wsrep_unlock_rollback();
- wsrep_thd_UNLOCK(thd);
-
- break;
- }
- default:
- WSREP_WARN("bad wsrep query state: %d",
- wsrep_thd_query_state(thd));
- wsrep_thd_UNLOCK(thd);
- break;
- }
+ wsrep_enqueue_background_kill(item);
DBUG_RETURN(0);
}
@@ -19860,10 +19678,13 @@ wsrep_abort_transaction(
{
DBUG_ENTER("wsrep_innobase_abort_thd");
+ ut_a(bf_thd);
+ ut_a(victim_thd);
+
trx_t* victim_trx = thd_to_trx(victim_thd);
- trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
+ trx_t* bf_trx = thd_to_trx(bf_thd);
- WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d",
+ WSREP_DEBUG("Abort transaction: BF: %s victim: %s victim conf: %d",
wsrep_thd_query(bf_thd),
wsrep_thd_query(victim_thd),
wsrep_thd_conflict_state(victim_thd, FALSE));
@@ -19871,8 +19692,10 @@ wsrep_abort_transaction(
if (victim_trx) {
lock_mutex_enter();
trx_mutex_enter(victim_trx);
- int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
- victim_trx, signal);
+ int rcode = wsrep_innobase_kill_one_trx(bf_thd,
+ bf_trx,
+ victim_trx,
+ static_cast<bool>(signal));
lock_mutex_exit();
trx_mutex_exit(victim_trx);
wsrep_srv_conc_cancel_wait(victim_trx);
diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h
index 693dcd15163..27d1137e808 100644
--- a/storage/innobase/include/ha_prototypes.h
+++ b/storage/innobase/include/ha_prototypes.h
@@ -238,7 +238,7 @@ int
wsrep_innobase_kill_one_trx(void * const thd_ptr,
const trx_t * const bf_trx,
trx_t *victim_trx,
- ibool signal);
+ bool signal);
int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
unsigned char* str, unsigned int str_length,
unsigned int buf_length);
diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc
index 006ecc28276..408f2e0c16a 100644
--- a/storage/innobase/lock/lock0lock.cc
+++ b/storage/innobase/lock/lock0lock.cc
@@ -1,7 +1,7 @@
/*****************************************************************************
Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
-Copyright (c) 2014, 2019, MariaDB Corporation.
+Copyright (c) 2014, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -1150,7 +1150,7 @@ wsrep_kill_victim(
}
wsrep_innobase_kill_one_trx(trx->mysql_thd,
- trx, lock->trx, TRUE);
+ trx, lock->trx, true);
}
}
}