diff options
author | Jan Lindström <jan.lindstrom@mariadb.com> | 2020-03-12 15:34:50 +0200 |
---|---|---|
committer | Jan Lindström <jan.lindstrom@mariadb.com> | 2020-05-06 10:46:32 +0300 |
commit | e7071a039c8ca1a94b6f43c9b42c1e262991e5f9 (patch) | |
tree | 9b112fabb63183fd09cc11ea2f55054cfab8d5d5 | |
parent | 1af74d523a70622e8abb528d4fd991d72b90c887 (diff) | |
download | mariadb-git-bb-10.1-MDEV-21910.tar.gz |
MDEV-21910 : KIlling thread on Galera could cause mutex deadlockbb-10.1-MDEV-21910
Following issues here:
Whenever Galera BF (brute force) transaction decides to abort conflicting
transaction it will kill that thread using thd::awake()
User KILL [QUERY|CONNECTION] ... for a thread it will also call thd::awake()
Whenever one of these actions is executed we will hold number of InnoDB
internal mutexes and thd mutexes. Sometimes these mutexes are taken in
different order causing mutex deadlock.
Lets consider a example starting from Galera BF transaction deciding to abort
conflicting transaction (let's call this thread1):
thread1:
lock_rec_other_has_conflicting
here we hold both lock_sys mutex and trx mutex for conflicting lock transaction.
wsrep_innobase_kill_one_trx
For victim we call wsrep_thd_awake
Next thread2 we assume user to execute KILL QUERY to some other executing
query (note it can't be BF query).
thread2:
sql_kill()
kill_one_thread
find_thread_by_id
takes LOCK_thd_kill
thd->awake_no_mutex()
thread1:
thd->awake(KILL_QUERY)
Tries to have LOCK_thd_kill but it is hold by thread2 so we wait (note that
we still hold lock_sys mutex and trx mutex).
thread2:
ha_kill_query()
kill_handlerton
innobase_kill_query
lock_trx_handle_wait
lock_mutex_enter() must wait as thread1 is holding it
Thus thread1 lock_sys, trx_mutex waits -> thread2 LOCK_thd_kill waits lock_sys -> thread1
==> thread1 waits -> thread2 waits -> thread1 ==> mutex deadlock.
In this patch we will fix Galera BF and user kill cases so that we enqueue
victim thread to a list while we hold InnoDB mutexes and we then release them.
A new background thread will pick victim thread from this new list and uses
thd::awake() with no InnoDB mutexes. Idea is similar to replication background
kill. This fix enforces that we take LOCK_thd_data -> lock sys mutex -> trx mutex
always in this order.
wsrep_mysqld.cc
Here we introduce a list where victim threads are stored,
condition variable to be used to wake up background thread
and mutex to protect list.
wsrep_thd.cc
Create a new background thread to handle victim thread
abort. We may take wsrep_thd_LOCK mutex here but not any
InnoDB mutexes.
wsrep_innobase_kill_one_trx
Remove all the wsrep code that was moved to wsrep_thd.cc
We just enqueue required information to background kill
list and cancel victim trx lock wait if there is such.
Here we have InnoDB lock sys mutex and trx mutex so here
we can't take wsrep_thd_LOCK mutex.
wsrep_abort_transaction
Cleanup only.
34 files changed, 870 insertions, 534 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h index 499fbf2c173..d8f56d0b4a1 100644 --- a/include/mysql/service_wsrep.h +++ b/include/mysql/service_wsrep.h @@ -70,6 +70,7 @@ struct xid_t; struct wsrep; struct wsrep_ws_handle; struct wsrep_buf; +typedef struct wsrep_kill wsrep_kill_t; extern struct wsrep_service_st { struct wsrep * (*get_wsrep_func)(); @@ -112,6 +113,7 @@ extern struct wsrep_service_st { int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD); void (*wsrep_unlock_rollback_func)(); void (*wsrep_set_data_home_dir_func)(const char *data_dir); + bool (*wsrep_enqueue_background_kill_func)(wsrep_kill_t); } *wsrep_service; #ifdef MYSQL_DYNAMIC_PLUGIN @@ -155,6 +157,7 @@ extern struct wsrep_service_st { #define wsrep_trx_order_before(T1,T2) wsrep_service->wsrep_trx_order_before_func(T1,T2) #define wsrep_unlock_rollback() wsrep_service->wsrep_unlock_rollback_func() #define wsrep_set_data_home_dir(A) wsrep_service->wsrep_set_data_home_dir_func(A) +#define wsrep_enqueue_background_kill(T) wsrep_service->wsrep_enqueue_background_kill_func(T); #define wsrep_debug get_wsrep_debug() #define wsrep_log_conflicts get_wsrep_log_conflicts() @@ -215,6 +218,7 @@ bool wsrep_thd_ignore_table(THD *thd); void wsrep_unlock_rollback(); void wsrep_set_data_home_dir(const char *data_dir); +bool wsrep_enqueue_background_kill(wsrep_kill_t); #endif #ifdef __cplusplus diff --git a/mysql-test/suite/galera/r/galera_bf_kill.result b/mysql-test/suite/galera/r/galera_bf_kill.result new file mode 100644 index 00000000000..1ce2974f158 --- /dev/null +++ b/mysql-test/suite/galera/r/galera_bf_kill.result @@ -0,0 +1,49 @@ +connection node_2; +CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB; +insert into t1 values (NULL,1); +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a = 5; +connection node_2; +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5; +connection node_2; +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5, b=2; +connection node_2; +ALTER TABLE t1 ADD UNIQUE KEY(b); +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5, b=2; +connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2b; +begin; +update t1 set a =6, b=7; +connection node_2; +ALTER TABLE t1 ADD UNIQUE KEY(b); +Warnings: +Note 1831 Duplicate index `b_2`. This is deprecated and will be disallowed in a future release +select * from t1; +a b +2 1 +disconnect node_2a; +disconnect node_2b; +drop table t1; diff --git a/mysql-test/suite/galera/r/galera_insert_multi.result b/mysql-test/suite/galera/r/galera_insert_multi.result index 33717781f2c..467684eb988 100644 --- a/mysql-test/suite/galera/r/galera_insert_multi.result +++ b/mysql-test/suite/galera/r/galera_insert_multi.result @@ -1,32 +1,32 @@ CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; INSERT INTO t1 VALUES (1),(2); INSERT INTO t1 VALUES (3),(4); -SELECT COUNT(*) = 4 FROM t1; -COUNT(*) = 4 -1 -SELECT COUNT(*) = 4 FROM t1; -COUNT(*) = 4 -1 +SELECT COUNT(*) AS EXPECT_4 FROM t1; +EXPECT_4 +4 +SELECT COUNT(*) AS EXPECT_4 FROM t1; +EXPECT_4 +4 DROP TABLE t1; CREATE TABLE t1 (f1 INTEGER, KEY (f1)) ENGINE=InnoDB; INSERT INTO t1 VALUES (1),(1); INSERT INTO t1 VALUES (2),(2); -SELECT COUNT(*) = 4 FROM t1; -COUNT(*) = 4 -1 -SELECT COUNT(*) = 4 FROM t1; -COUNT(*) = 4 -1 +SELECT COUNT(*) AS EXPECT_4 FROM t1; +EXPECT_4 +4 +SELECT COUNT(*) AS EXPECT_4 FROM t1; +EXPECT_4 +4 DROP TABLE t1; CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; INSERT INTO t1 VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (1); ERROR 23000: Duplicate entry '1' for key 'PRIMARY' -SELECT COUNT(*) = 0 FROM t1; -COUNT(*) = 0 -1 -SELECT COUNT(*) = 0 FROM t1; -COUNT(*) = 0 -1 +SELECT COUNT(*) AS EXPECT_0 FROM t1; +EXPECT_0 +0 +SELECT COUNT(*) AS EXPECT_0 FROM t1; +EXPECT_0 +0 DROP TABLE t1; CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; SET AUTOCOMMIT = OFF; @@ -49,10 +49,10 @@ START TRANSACTION; INSERT INTO t1 VALUES (2), (1); ROLLBACK; COMMIT; -SELECT COUNT(*) = 2 FROM t1; -COUNT(*) = 2 -1 -SELECT COUNT(*) = 2 FROM t1; -COUNT(*) = 2 -1 +SELECT COUNT(*) AS EXPECT_2 FROM t1; +EXPECT_2 +2 +SELECT COUNT(*) AS EXPECT_2 FROM t1; +EXPECT_2 +2 DROP TABLE t1; diff --git a/mysql-test/suite/galera/r/galera_serializable.result b/mysql-test/suite/galera/r/galera_serializable.result index 90fe628e505..fb9e403892e 100644 --- a/mysql-test/suite/galera/r/galera_serializable.result +++ b/mysql-test/suite/galera/r/galera_serializable.result @@ -24,4 +24,10 @@ INSERT INTO t1 VALUES (1,1); INSERT INTO t1 VALUES (1,2); COMMIT; ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +SELECT * from t1; +id f2 +1 2 +SELECT * from t1; +id f2 +1 2 DROP TABLE t1; diff --git a/mysql-test/suite/galera/r/galera_var_slave_threads.result b/mysql-test/suite/galera/r/galera_var_slave_threads.result index 5a78d84c24e..160b2c1872b 100644 --- a/mysql-test/suite/galera/r/galera_var_slave_threads.result +++ b/mysql-test/suite/galera/r/galera_var_slave_threads.result @@ -14,7 +14,7 @@ SELECT @@wsrep_slave_threads = 1; SET GLOBAL wsrep_slave_threads = 1; SELECT COUNT(*) FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user'; COUNT(*) -2 +3 SELECT COUNT(*) FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND STATE LIKE '%wsrep aborter%'; COUNT(*) 1 diff --git a/mysql-test/suite/galera/t/MW-286.cnf b/mysql-test/suite/galera/t/MW-286.cnf new file mode 100644 index 00000000000..8ec4c2333cb --- /dev/null +++ b/mysql-test/suite/galera/t/MW-286.cnf @@ -0,0 +1,10 @@ +!include ../galera_2nodes.cnf + +[mysqld.1] +wsrep-debug=ON +wsrep-log-conflicts=ON + +[mysqld.2] +wsrep-debug=ON +wsrep-log-conflicts=ON + diff --git a/mysql-test/suite/galera/t/MW-286.test b/mysql-test/suite/galera/t/MW-286.test index 426b4493bb7..fcfbf4c62c3 100644 --- a/mysql-test/suite/galera/t/MW-286.test +++ b/mysql-test/suite/galera/t/MW-286.test @@ -3,8 +3,6 @@ # --source include/galera_cluster.inc ---source include/have_innodb.inc ---source include/big_test.inc --connection node_1 CREATE TABLE ten (f1 INTEGER) Engine=InnoDB; diff --git a/mysql-test/suite/galera/t/galera_as_slave_gtid_myisam.test b/mysql-test/suite/galera/t/galera_as_slave_gtid_myisam.test index faa9ddfd5c8..d8760d2b639 100644 --- a/mysql-test/suite/galera/t/galera_as_slave_gtid_myisam.test +++ b/mysql-test/suite/galera/t/galera_as_slave_gtid_myisam.test @@ -57,9 +57,13 @@ DROP TABLE t1; reset master; --connection node_2 +--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'; +--source include/wait_condition.inc STOP SLAVE; RESET SLAVE ALL; reset master; --connection node_3 +--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'; +--source include/wait_condition.inc reset master; diff --git a/mysql-test/suite/galera/t/galera_bf_abort_sleep.test b/mysql-test/suite/galera/t/galera_bf_abort_sleep.test index 8d135dc7d42..304e13fafd1 100644 --- a/mysql-test/suite/galera/t/galera_bf_abort_sleep.test +++ b/mysql-test/suite/galera/t/galera_bf_abort_sleep.test @@ -8,8 +8,8 @@ CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; --connection node_2 -SET AUTOCOMMIT=OFF; --let $wsrep_local_bf_aborts_before = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_local_bf_aborts'` +SET AUTOCOMMIT=OFF; INSERT INTO t1 VALUES (1); --send SELECT SLEEP(1000); diff --git a/mysql-test/suite/galera/t/galera_bf_kill.cnf b/mysql-test/suite/galera/t/galera_bf_kill.cnf new file mode 100644 index 00000000000..8ec4c2333cb --- /dev/null +++ b/mysql-test/suite/galera/t/galera_bf_kill.cnf @@ -0,0 +1,10 @@ +!include ../galera_2nodes.cnf + +[mysqld.1] +wsrep-debug=ON +wsrep-log-conflicts=ON + +[mysqld.2] +wsrep-debug=ON +wsrep-log-conflicts=ON + diff --git a/mysql-test/suite/galera/t/galera_bf_kill.test b/mysql-test/suite/galera/t/galera_bf_kill.test new file mode 100644 index 00000000000..0ad86c13dba --- /dev/null +++ b/mysql-test/suite/galera/t/galera_bf_kill.test @@ -0,0 +1,118 @@ +--source include/galera_cluster.inc + +# +# Test case 1: Start a transaction on node_2a and kill it +# from other connection on same node +# + +--connection node_2 +CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB; +insert into t1 values (NULL,1); + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a = 5; + +--connection node_2 + +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1 +--source include/wait_condition.inc + +--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1` + +--disable_query_log +--eval KILL $k_thread +--enable_query_log + +select * from t1; +--disconnect node_2a + +# +# Test case 2: Start a transaction on node_2a and use +# kill query from other connection on same node +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5; + +--connection node_2 +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1 +--source include/wait_condition.inc + +--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1` + +--disable_query_log +--eval KILL QUERY $k_thread +--enable_query_log + +select * from t1; +--disconnect node_2a +# +# Test case 3: Start a transaction on node_2a and start a DDL on other transaction +# that will then abort node_2a transaction +# +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5, b=2; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY b1(b); +ALTER TABLE t1 DROP KEY b1; + +select * from t1; + +--disconnect node_2a + +# +# Test case 4: Start a transaction on node_2a and conflicting transaction on node_2b +# and start a DDL on other transaction that will then abort node_2a and node_2b +# transactions +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5, b=2; + +--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2b +begin; +send update t1 set a =6, b=7; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY b2(b); +ALTER TABLE t1 DROP KEY b2; + +select * from t1; + +--disconnect node_2a +--disconnect node_2b + +# +# Test case 5: Start a transaction on node_2a with wsrep disabled +# and start a DDL on other transaction that will then abort node_2a +# transactions +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +SET SESSION wsrep_on=OFF; +begin; +update t1 set a =5, b=2; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY b3(b); + +select * from t1; + +--disconnect node_2a + + +drop table t1; + + + diff --git a/mysql-test/suite/galera/t/galera_drop_database.cnf b/mysql-test/suite/galera/t/galera_drop_database.cnf new file mode 100644 index 00000000000..8ec4c2333cb --- /dev/null +++ b/mysql-test/suite/galera/t/galera_drop_database.cnf @@ -0,0 +1,10 @@ +!include ../galera_2nodes.cnf + +[mysqld.1] +wsrep-debug=ON +wsrep-log-conflicts=ON + +[mysqld.2] +wsrep-debug=ON +wsrep-log-conflicts=ON + diff --git a/mysql-test/suite/galera/t/galera_insert_multi.test b/mysql-test/suite/galera/t/galera_insert_multi.test index d62283aff69..e225b4bb199 100644 --- a/mysql-test/suite/galera/t/galera_insert_multi.test +++ b/mysql-test/suite/galera/t/galera_insert_multi.test @@ -13,10 +13,16 @@ INSERT INTO t1 VALUES (1),(2); INSERT INTO t1 VALUES (3),(4); --connection node_1 -SELECT COUNT(*) = 4 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 4 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_4 FROM t1; --connection node_2 -SELECT COUNT(*) = 4 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 4 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_4 FROM t1; DROP TABLE t1; @@ -32,10 +38,16 @@ INSERT INTO t1 VALUES (1),(1); INSERT INTO t1 VALUES (2),(2); --connection node_2 -SELECT COUNT(*) = 4 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 4 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_4 FROM t1; --connection node_1 -SELECT COUNT(*) = 4 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 4 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_4 FROM t1; DROP TABLE t1; @@ -49,10 +61,10 @@ CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; --error ER_DUP_ENTRY INSERT INTO t1 VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (1); -SELECT COUNT(*) = 0 FROM t1; +SELECT COUNT(*) AS EXPECT_0 FROM t1; --connection node_2 -SELECT COUNT(*) = 0 FROM t1; +SELECT COUNT(*) AS EXPECT_0 FROM t1; DROP TABLE t1; @@ -107,10 +119,13 @@ ROLLBACK; --connection node_2 COMMIT; -SELECT COUNT(*) = 2 FROM t1; +SELECT COUNT(*) AS EXPECT_2 FROM t1; --connection node_1 -SELECT COUNT(*) = 2 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 2 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_2 FROM t1; DROP TABLE t1; diff --git a/mysql-test/suite/galera/t/galera_parallel_autoinc_largetrx.test b/mysql-test/suite/galera/t/galera_parallel_autoinc_largetrx.test index 34558283462..3188ee19880 100644 --- a/mysql-test/suite/galera/t/galera_parallel_autoinc_largetrx.test +++ b/mysql-test/suite/galera/t/galera_parallel_autoinc_largetrx.test @@ -21,7 +21,7 @@ CREATE TABLE t1 (f1 INTEGER AUTO_INCREMENT PRIMARY KEY, f2 INTEGER) Engine=InnoD set session wsrep_sync_wait=15; --let $wsrep_slave_threads_orig = `SELECT @@wsrep_slave_threads` SET GLOBAL wsrep_slave_threads = 4; ---let $wait_condition = SELECT COUNT(*) = @@wsrep_slave_threads + 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE IS NULL OR STATE NOT LIKE 'InnoDB%'); +--let $wait_condition = SELECT COUNT(*) = @@wsrep_slave_threads + 2 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE IS NULL OR STATE NOT LIKE 'InnoDB%'); --source include/wait_condition.inc diff --git a/mysql-test/suite/galera/t/galera_parallel_autoinc_manytrx.test b/mysql-test/suite/galera/t/galera_parallel_autoinc_manytrx.test index d04603891db..1db5ee3bdc7 100644 --- a/mysql-test/suite/galera/t/galera_parallel_autoinc_manytrx.test +++ b/mysql-test/suite/galera/t/galera_parallel_autoinc_manytrx.test @@ -25,7 +25,7 @@ CREATE TABLE t1 (f1 INTEGER AUTO_INCREMENT PRIMARY KEY, f2 INTEGER) Engine=InnoD set session wsrep_sync_wait=15; --let $wsrep_slave_threads_orig = `SELECT @@wsrep_slave_threads` SET GLOBAL wsrep_slave_threads = 4; ---let $wait_condition = SELECT COUNT(*) = @@wsrep_slave_threads + 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE IS NULL OR STATE NOT LIKE 'InnoDB%'); +--let $wait_condition = SELECT COUNT(*) = @@wsrep_slave_threads + 2 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE IS NULL OR STATE NOT LIKE 'InnoDB%'); --source include/wait_condition.inc --connection node_1 diff --git a/mysql-test/suite/galera/t/galera_serializable.test b/mysql-test/suite/galera/t/galera_serializable.test index b12d57fd488..acbf99810a2 100644 --- a/mysql-test/suite/galera/t/galera_serializable.test +++ b/mysql-test/suite/galera/t/galera_serializable.test @@ -7,7 +7,6 @@ # --source include/galera_cluster.inc ---source include/have_innodb.inc --connection node_1 @@ -26,7 +25,12 @@ SELECT * FROM t1; --connection node_2 INSERT INTO t1 VALUES (1,1); ---sleep 2 +--connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1 +--connection node_1a +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--error 0,ER_LOCK_DEADLOCK +--source include/wait_condition.inc + --connection node_1 --error ER_LOCK_DEADLOCK SELECT * FROM t1; @@ -46,7 +50,11 @@ SELECT * FROM t1; --connection node_2 UPDATE t1 SET f2 = 2; ---sleep 2 +--connection node_1a +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--error 0,ER_LOCK_DEADLOCK +--source include/wait_condition.inc + --connection node_1 --error ER_LOCK_DEADLOCK UPDATE t1 SET f2 = 3; @@ -62,8 +70,6 @@ DELETE FROM t1; --connection node_1 START TRANSACTION; - ---connection node_1 INSERT INTO t1 VALUES (1,1); --connection node_2 @@ -73,4 +79,18 @@ INSERT INTO t1 VALUES (1,2); --error ER_LOCK_DEADLOCK COMMIT; +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--source include/wait_condition.inc + +SELECT * from t1; + +--connection node_2 + +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--source include/wait_condition.inc + +SELECT * from t1; + DROP TABLE t1; + +--disconnect node_1a diff --git a/mysql-test/suite/galera/t/galera_unicode_pk.test b/mysql-test/suite/galera/t/galera_unicode_pk.test index 0d571f5cfd7..242d27b2fa7 100644 --- a/mysql-test/suite/galera/t/galera_unicode_pk.test +++ b/mysql-test/suite/galera/t/galera_unicode_pk.test @@ -12,6 +12,9 @@ CREATE TABLE t1 ( INSERT INTO t1 VALUES ('текст'); --connection node_2 +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--source include/wait_condition.inc + SELECT f1 = 'текст' FROM t1; # diff --git a/mysql-test/suite/galera/t/galera_var_slave_threads.test b/mysql-test/suite/galera/t/galera_var_slave_threads.test index 95cd2aac163..940b7ac0e99 100644 --- a/mysql-test/suite/galera/t/galera_var_slave_threads.test +++ b/mysql-test/suite/galera/t/galera_var_slave_threads.test @@ -43,7 +43,7 @@ INSERT INTO t1 VALUES (1); SELECT COUNT(*) FROM t1; --echo # wsrep_slave_threads = 64 ---let $wait_condition = SELECT COUNT(*) = @@wsrep_slave_threads + 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user'; +--let $wait_condition = SELECT COUNT(*) = 64 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE LIKE '%wsrep aborter%' OR STATE IS NULL) --source include/wait_condition.inc SELECT COUNT(*) FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND STATE LIKE '%wsrep aborter%'; @@ -75,7 +75,7 @@ while ($count) SELECT COUNT(*) FROM t2; --echo # wsrep_slave_threads = 1 ---let $wait_condition = SELECT COUNT(*) = @@wsrep_slave_threads + 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user'; +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE LIKE '%wsrep aborter%' OR STATE IS NULL); --source include/wait_condition.inc SELECT COUNT(*) FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND STATE LIKE '%wsrep aborter%'; @@ -94,7 +94,7 @@ CREATE TABLE t1 (i INT AUTO_INCREMENT PRIMARY KEY) ENGINE=INNODB; --connection node_2 SET GLOBAL wsrep_slave_threads = 4; ---let $wait_condition = SELECT COUNT(*) = @@wsrep_slave_threads + 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' +--let $wait_condition = SELECT COUNT(*) = 4 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE LIKE '%wsrep aborter%' OR STATE IS NULL); --source include/wait_condition.inc SET GLOBAL wsrep_slave_threads = 1; @@ -119,7 +119,7 @@ SELECT NAME FROM INFORMATION_SCHEMA.INNODB_SYS_TABLES WHERE NAME LIKE 'test/t%'; # # make sure that we are left with exactly one applier thread before we leaving the test # ---let $wait_condition = SELECT COUNT(*) = 2 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND STATE LIKE '%wsrep aborter%'; --source include/wait_condition.inc SELECT COUNT(*) FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND STATE LIKE '%wsrep aborter%'; diff --git a/mysql-test/suite/wsrep/r/variables.result b/mysql-test/suite/wsrep/r/variables.result index d11cffba116..800fce6b01c 100644 --- a/mysql-test/suite/wsrep/r/variables.result +++ b/mysql-test/suite/wsrep/r/variables.result @@ -144,7 +144,7 @@ Variable_name Value Threads_connected 1 SHOW STATUS LIKE 'wsrep_thread_count'; Variable_name Value -wsrep_thread_count 2 +wsrep_thread_count 3 SET @wsrep_slave_threads_saved= @@global.wsrep_slave_threads; SET GLOBAL wsrep_slave_threads= 10; @@ -154,7 +154,7 @@ Variable_name Value Threads_connected 1 SHOW STATUS LIKE 'wsrep_thread_count'; Variable_name Value -wsrep_thread_count 11 +wsrep_thread_count 12 set wsrep_on=0; set wsrep_on=1; create user test@localhost; diff --git a/mysql-test/suite/wsrep/t/variables.test b/mysql-test/suite/wsrep/t/variables.test index 1a2ab2579a5..59d52e04c75 100644 --- a/mysql-test/suite/wsrep/t/variables.test +++ b/mysql-test/suite/wsrep/t/variables.test @@ -104,7 +104,7 @@ SET GLOBAL wsrep_cluster_address= 'gcomm://'; --echo # Wait for applier threads to get created. --let $wait_timeout=600 ---let $wait_condition = SELECT COUNT(*) = 2 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE IS NULL OR STATE NOT LIKE 'InnoDB%'); +--let $wait_condition = SELECT COUNT(*) = 3 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE IS NULL OR STATE NOT LIKE 'InnoDB%'); --source include/wait_condition.inc --replace_regex /.*libgalera_smm.*/libgalera_smm.so/ @@ -118,7 +118,7 @@ SET @wsrep_slave_threads_saved= @@global.wsrep_slave_threads; SET GLOBAL wsrep_slave_threads= 10; --echo # Wait for applier threads to get created. ---let $wait_condition = SELECT COUNT(*) = 11 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE IS NULL OR STATE NOT LIKE 'InnoDB%'); +--let $wait_condition = SELECT COUNT(*) = 12 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'system user' AND (STATE IS NULL OR STATE NOT LIKE 'InnoDB%'); --source include/wait_condition.inc SHOW STATUS LIKE 'threads_connected'; diff --git a/sql/sql_class.h b/sql/sql_class.h index 9071a2db516..d8f842decea 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -4114,6 +4114,8 @@ public: #ifdef WITH_WSREP const bool wsrep_applier; /* dedicated slave applier thread */ + bool wsrep_killer; /* dedicated background + kill thread */ bool wsrep_applier_closing; /* applier marked to close */ bool wsrep_client_thread; /* to identify client threads*/ bool wsrep_PA_safe; diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic index 6cf2a31f76b..dcc28036bbc 100644 --- a/sql/sql_plugin_services.ic +++ b/sql/sql_plugin_services.ic @@ -181,7 +181,8 @@ static struct wsrep_service_st wsrep_handler = { wsrep_trx_is_aborting, wsrep_trx_order_before, wsrep_unlock_rollback, - wsrep_set_data_home_dir + wsrep_set_data_home_dir, + wsrep_enqueue_background_kill }; static struct thd_specifics_service_st thd_specifics_handler= diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc index 364ef2d3e7a..0f59cd434ed 100644 --- a/sql/wsrep_dummy.cc +++ b/sql/wsrep_dummy.cc @@ -16,6 +16,7 @@ #include <my_global.h> #include <sql_class.h> #include <mysql/service_wsrep.h> +#include "wsrep_mysqld.h" my_bool wsrep_thd_is_BF(THD *, my_bool) { return 0; } @@ -145,3 +146,6 @@ void wsrep_set_data_home_dir(const char *) void wsrep_log(void (*)(const char *, ...), const char *, ...) { } +bool wsrep_enqueue_background_kill(wsrep_kill_t item) +{ return false;} + diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index f38bf85cd1a..af07dc490bc 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -38,6 +38,8 @@ #include <cstdlib> #include "log_event.h" #include "sql_plugin.h" /* wsrep_plugins_pre_init() */ +#include <list> +#include <algorithm> wsrep_t *wsrep = NULL; /* @@ -123,6 +125,8 @@ mysql_cond_t COND_wsrep_replaying; mysql_mutex_t LOCK_wsrep_slave_threads; mysql_mutex_t LOCK_wsrep_desync; mysql_mutex_t LOCK_wsrep_config_state; +mysql_mutex_t LOCK_wsrep_kill; +mysql_cond_t COND_wsrep_kill; int wsrep_replaying= 0; ulong wsrep_running_threads = 0; // # of currently running wsrep threads @@ -133,11 +137,13 @@ PSI_mutex_key key_LOCK_wsrep_rollback, key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init, key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync, - key_LOCK_wsrep_config_state; + key_LOCK_wsrep_config_state, + key_LOCK_wsrep_kill; PSI_cond_key key_COND_wsrep_rollback, key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, - key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread; + key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread, + key_COND_wsrep_kill; PSI_file_key key_file_wsrep_gra_log; @@ -152,7 +158,8 @@ static PSI_mutex_info wsrep_mutexes[]= { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL}, - { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL} + { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_kill, "LOCK_wsrep_kill", PSI_FLAG_GLOBAL} }; static PSI_cond_info wsrep_conds[]= @@ -162,7 +169,8 @@ static PSI_cond_info wsrep_conds[]= { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0}, { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL}, - { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL} + { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_kill, "COND_wsrep_kill", PSI_FLAG_GLOBAL} }; static PSI_file_info wsrep_files[]= @@ -213,6 +221,7 @@ wsp::Config_state wsrep_config_state; // if there was no state gap on receiving first view event. static my_bool wsrep_startup = TRUE; +std::list< wsrep_kill_t > wsrep_kill_list; static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { switch (level) { @@ -799,6 +808,8 @@ void wsrep_thr_init() mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_kill, &LOCK_wsrep_kill, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_kill, &COND_wsrep_kill, NULL); } void wsrep_init_startup (bool first) @@ -833,6 +844,7 @@ void wsrep_init_startup (bool first) if (!wsrep_start_replication()) unireg_abort(1); wsrep_create_rollbacker(); + wsrep_create_killer(); wsrep_create_appliers(1); if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed @@ -872,6 +884,8 @@ void wsrep_thr_deinit() mysql_mutex_destroy(&LOCK_wsrep_slave_threads); mysql_mutex_destroy(&LOCK_wsrep_desync); mysql_mutex_destroy(&LOCK_wsrep_config_state); + mysql_mutex_destroy(&LOCK_wsrep_kill); + mysql_cond_destroy(&COND_wsrep_kill); } void wsrep_recover() @@ -1636,7 +1650,7 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false) { - WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd)); + WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd)); return 1; } @@ -2353,7 +2367,7 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd) } DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); - WSREP_DEBUG("waiting for client connections to close: %u", thread_count); + WSREP_DEBUG("Waiting for client connections to close: %u", thread_count); while (wait_to_end && have_client_connections()) { @@ -2387,7 +2401,7 @@ void wsrep_close_threads(THD *thd) DBUG_PRINT("quit",("Informing thread %ld that it's time to die", tmp->thread_id)); /* We skip slave threads & scheduler on this first loop through. */ - if (tmp->wsrep_applier && tmp != thd) + if ((tmp->wsrep_applier || tmp->wsrep_killer) && tmp != thd) { WSREP_DEBUG("closing wsrep thread %ld", tmp->thread_id); wsrep_close_thread (tmp); @@ -2401,8 +2415,9 @@ void wsrep_wait_appliers_close(THD *thd) { /* Wait for wsrep appliers to gracefully exit */ mysql_mutex_lock(&LOCK_thread_count); - while (wsrep_running_threads > 1) + while (wsrep_running_threads > 2) // 1 is for rollbacker thread which needs to be killed explicitly. + // 2 is for wsrep background killer thread // This gotta be fixed in a more elegant manner if we gonna have arbitrary // number of non-applier wsrep threads. { @@ -2647,11 +2662,11 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) { + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + if (signal) { - mysql_mutex_lock(&thd->LOCK_thd_data); thd->awake(KILL_QUERY); - mysql_mutex_unlock(&thd->LOCK_thd_data); } else { @@ -2659,6 +2674,8 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) mysql_cond_broadcast(&COND_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying); } + + mysql_mutex_unlock(&thd->LOCK_thd_data); } @@ -2923,3 +2940,31 @@ bool wsrep_node_is_synced() { return (WSREP_ON) ? (wsrep_config_state.get_status() == 4) : false; } + +bool wsrep_enqueue_background_kill(wsrep_kill_t item) +{ + std::list< wsrep_kill_t >::iterator it; + bool inserted= false; + + mysql_mutex_lock(&LOCK_wsrep_kill); + + for (it = wsrep_kill_list.begin(); it != wsrep_kill_list.end(); it++) + { + if ((*it).victim_thd_id == item.victim_thd_id) + break; + } + + if(it != wsrep_kill_list.end()) + { + WSREP_DEBUG("Thread: %lu already on kill list", item.victim_thd_id); + } + else + { + wsrep_kill_list.push_back(item); + mysql_cond_signal(&COND_wsrep_kill); + inserted= true; + } + + mysql_mutex_unlock(&LOCK_wsrep_kill); + return inserted; +} diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 1531abe78f8..a14688de464 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -18,9 +18,20 @@ #ifndef WSREP_MYSQLD_H #define WSREP_MYSQLD_H +#include <my_config.h> +#include <stdint.h> #include <mysql/plugin.h> #include <mysql/service_wsrep.h> +typedef struct wsrep_kill { + unsigned long victim_thd_id; + unsigned long bf_thd_id; + uint64_t victim_trx_id; + uint64_t bf_trx_id; + bool signal; + bool wait_lock; +} wsrep_kill_t; + #ifdef WITH_WSREP typedef struct st_mysql_show_var SHOW_VAR; @@ -89,6 +100,7 @@ extern my_bool wsrep_restart_slave_activated; extern my_bool wsrep_slave_FK_checks; extern my_bool wsrep_slave_UK_checks; extern ulong wsrep_running_threads; +extern ulong wsrep_running_killer_threads; extern bool wsrep_new_cluster; extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; @@ -221,8 +233,6 @@ void wsrep_log(void (*fun)(const char *, ...), const char *format, ...); #define WSREP_PROVIDER_EXISTS \ (wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN)) -#define WSREP_QUERY(thd) (thd->query()) - extern my_bool wsrep_ready_get(); extern void wsrep_ready_wait(); @@ -252,6 +262,8 @@ extern mysql_cond_t COND_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_mutex_t LOCK_wsrep_desync; extern mysql_mutex_t LOCK_wsrep_config_state; +extern mysql_mutex_t LOCK_wsrep_kill; +extern mysql_cond_t COND_wsrep_kill; extern wsrep_aborting_thd_t wsrep_aborting_thd; extern my_bool wsrep_emulate_bin_log; extern int wsrep_to_isolation; @@ -276,8 +288,11 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying; extern PSI_cond_key key_COND_wsrep_replaying; extern PSI_mutex_key key_LOCK_wsrep_slave_threads; extern PSI_mutex_key key_LOCK_wsrep_desync; +extern PSI_mutex_key key_LOCK_wsrep_kill; +extern PSI_cond_key key_COND_wsrep_kill; extern PSI_file_key key_file_wsrep_gra_log; +extern PSI_thread_key key_wsrep_killer; #endif /* HAVE_PSI_INTERFACE */ struct TABLE_LIST; class Alter_info; @@ -301,6 +316,7 @@ void thd_binlog_rollback_stmt(THD * thd); void thd_binlog_trx_reset(THD * thd); typedef void (*wsrep_thd_processor_fun)(THD *); + pthread_handler_t start_wsrep_THD(void *arg); int wsrep_wait_committing_connections_close(int wait_time); extern void wsrep_close_client_connections(my_bool wait_to_end, diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index fad9e3f70c8..943d022589e 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -22,11 +22,16 @@ //#include "global_threads.h" // LOCK_thread_count, etc. #include "sql_base.h" // close_thread_tables() #include "mysqld.h" // start_wsrep_THD(); +#include "sql_show.h" // find_thread_by_id #include "slave.h" // opt_log_slave_updates #include "rpl_filter.h" #include "rpl_rli.h" #include "rpl_mi.h" +#include "debug_sync.h" +#include <list> + +extern std::list< wsrep_kill_t > wsrep_kill_list; #if (__LP64__) static volatile int64 wsrep_bf_aborts_counter(0); @@ -511,7 +516,7 @@ static void wsrep_rollback_process(THD *thd) mysql_mutex_unlock(&aborting->LOCK_thd_data); - set_current_thd(aborting); + set_current_thd(aborting); aborting->store_globals(); mysql_mutex_lock(&aborting->LOCK_thd_data); @@ -520,7 +525,7 @@ static void wsrep_rollback_process(THD *thd) aborting->thread_id, (long long)aborting->real_id); mysql_mutex_unlock(&aborting->LOCK_thd_data); - set_current_thd(thd); + set_current_thd(thd); thd->store_globals(); mysql_mutex_lock(&LOCK_wsrep_rollback); @@ -697,3 +702,332 @@ void wsrep_thd_auto_increment_variables(THD* thd, *increment= thd->variables.auto_increment_increment; } } + +static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno) +{ + WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " + "caused by:\n\t" + "1) unsupported configuration options combination, please check documentation.\n\t" + "2) a bug in the code.\n\t" + "3) a database corruption.\n Node consistency compromized, " + "need to abort. Restart the node to resync with cluster.", + bf_seqno, victim_seqno); + abort(); +} + +static int wsrep_kill(wsrep_kill_t* item) +{ + bool signal= item->signal; + unsigned long long victim_trx_id= static_cast<unsigned long long>(item->victim_trx_id); + unsigned long long bf_trx_id= static_cast<unsigned long long>(item->bf_trx_id); + + // Note that find_thread_by_id will acquire LOCK_thd_data mutex + // for thd if it's found + THD* bf_thd= find_thread_by_id(item->bf_thd_id, false); + + if (!bf_thd) + { + WSREP_ERROR("BF thread: %lu not found", item->bf_thd_id); + assert(0); + } + + long long bf_seqno= wsrep_thd_trx_seqno(bf_thd); + + WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s query: %s", + wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal", + bf_trx_id, + item->bf_thd_id, + bf_seqno, + wsrep_thd_query_state_str(bf_thd), + wsrep_thd_conflict_state_str(bf_thd), + wsrep_thd_query(bf_thd)); + + // Note that we need to release LOCK_thd_data mutex from BF thread + // to obey safe mutex ordering of LOCK_thread_count -> LOCK_thd_data + // that both are taken on find_thread_by_id + wsrep_thd_UNLOCK(bf_thd); + + THD* thd= find_thread_by_id(item->victim_thd_id, false); + + if (!thd) + { + WSREP_DEBUG("Victim thread: %lu not found", item->victim_thd_id); + return(0); + } + + WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); + + unsigned long victim_thread= item->victim_thd_id; + long long victim_seqno= wsrep_thd_trx_seqno(thd); + + WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s query: %s", + wsrep_thd_is_BF(thd, false) ? "BF" : "normal", + victim_trx_id, + victim_thread, + victim_seqno, + wsrep_thd_query_state_str(thd), + wsrep_thd_conflict_state_str(thd), + wsrep_thd_query(thd)); + + if (wsrep_thd_query_state(thd) == QUERY_EXITING) + { + WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu" + " thread: %lu", + victim_trx_id, + victim_thread); + wsrep_thd_UNLOCK(thd); + return(0); + } + + if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) + { + WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu " + ", thread: %lu exec_mode: %s", + victim_trx_id, + victim_thread, + wsrep_thd_exec_mode_str(thd)); + } + + switch (wsrep_thd_get_conflict_state(thd)) + { + case NO_CONFLICT: + WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state", + victim_thread, + victim_trx_id); + wsrep_thd_set_conflict_state(thd, MUST_ABORT); + break; + case MUST_ABORT: + WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state", + victim_thread, + victim_trx_id); + wsrep_thd_awake(thd, signal); + return(0); + break; + case ABORTED: + case ABORTING: // fall through + default: + WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_conflict_state_str(thd)); + wsrep_thd_UNLOCK(thd); + return(0); + break; + } + + switch (wsrep_thd_query_state(thd)) + { + case QUERY_COMMITTING: + { + enum wsrep_status rcode=WSREP_OK; + + WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + } + else + { + wsrep_t *wsrep= get_wsrep(); + + rcode= wsrep->abort_pre_commit(wsrep, bf_seqno, + (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id); + + switch (rcode) + { + case WSREP_WARNING: + { + WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + return(1); + break; + } + case WSREP_OK: + break; + default: + { + WSREP_ERROR("Victim cancel commit bad commit exit thread: " + "%lu trx: %llu rcode: %d ", + victim_thread, + victim_trx_id, + rcode); + /* unable to interrupt, must abort */ + /* note: kill_mysql() will block, if we cannot. + * kill the lock holder first. */ + abort(); + break; + } + } + } + + wsrep_thd_awake(thd, signal); + break; + } + case QUERY_EXEC: + { + /* it is possible that victim trx is itself waiting for some + * other lock. We need to cancel this waiting */ + WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu", + victim_thread, victim_trx_id); + + bool wait_lock= item->wait_lock; + + if (wait_lock) + { + WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + } + else + { + /* Abort currently executing query */ + WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + + /* for BF thd, we need to prevent him from committing */ + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave for thread: " + "%lu trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + } + } + break; + } + case QUERY_IDLE: + { + WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: " + "%llu bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_thd_UNLOCK(thd); + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + return(0); + } + + /* This will lock thd from proceeding after net_read() */ + wsrep_thd_set_conflict_state(thd, ABORTING); + + wsrep_lock_rollback(); + + if (wsrep_aborting_thd_contains(thd)) + { + WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + } + else + { + wsrep_aborting_thd_enqueue(thd); + WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort", + victim_thread, + victim_trx_id); + } + + wsrep_unlock_rollback(); + wsrep_thd_UNLOCK(thd); + + break; + } + default: + { + WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_query_state_str(thd)); + + wsrep_thd_UNLOCK(thd); + break; + } + } + + return(0); +} + +static void wsrep_process_kill(THD *thd) +{ + DBUG_ENTER("wsrep_process_kill"); + + mysql_mutex_lock(&LOCK_wsrep_kill); + + WSREP_DEBUG("WSREP killer thread started"); + + while (thd->killed == NOT_KILLED) + { + thd_proc_info(thd, "wsrep killer idle"); + thd->mysys_var->current_mutex= &LOCK_wsrep_kill; + thd->mysys_var->current_cond= &COND_wsrep_kill; + + mysql_cond_wait(&COND_wsrep_kill,&LOCK_wsrep_kill); + + WSREP_DEBUG("WSREP killer thread wakes for signal"); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd_proc_info(thd, "wsrep killer active"); + thd->mysys_var->current_mutex= 0; + thd->mysys_var->current_cond= 0; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + /* process all entries in the queue */ + while (!wsrep_kill_list.empty()) + { + wsrep_kill_t to_be_killed= wsrep_kill_list.front(); + // Release list mutex while we kill one thread + mysql_mutex_unlock(&LOCK_wsrep_kill); + wsrep_kill(&to_be_killed); + mysql_mutex_lock(&LOCK_wsrep_kill); + wsrep_kill_list.pop_front(); + } + } + + assert(wsrep_kill_list.empty()); + + mysql_mutex_unlock(&LOCK_wsrep_kill); + sql_print_information("WSREP: killer thread exiting"); + DBUG_PRINT("wsrep",("wsrep killer thread exiting")); + DBUG_VOID_RETURN; +} + + +void wsrep_create_killer() +{ + if (create_wsrep_THD(wsrep_process_kill)) { + WSREP_WARN("Can't create thread to manage wsrep background kill"); + } +} + diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h index 6ce14a4eb0e..74d5e325997 100644 --- a/sql/wsrep_thd.h +++ b/sql/wsrep_thd.h @@ -28,6 +28,7 @@ void wsrep_client_rollback(THD *thd); void wsrep_replay_transaction(THD *thd); void wsrep_create_appliers(long threads); void wsrep_create_rollbacker(); +void wsrep_create_killer(); int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal); diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index 48e373bc56d..609d84b31c9 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -472,6 +472,7 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type) if (wsrep_start_replication()) { wsrep_create_rollbacker(); + wsrep_create_killer(); wsrep_create_appliers(wsrep_slave_threads); } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index c54dae220ae..82250bf7a60 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -4927,8 +4927,6 @@ static void innobase_kill_query(handlerton*, THD* thd, enum thd_kill_levels) /* if victim has been signaled by BF thread and/or aborting is already progressing, following query aborting is not necessary any more. - Also, BF thread should own trx mutex for the victim, which would - conflict with trx_mutex_enter() below */ DBUG_VOID_RETURN; } @@ -18542,239 +18540,82 @@ static struct st_mysql_storage_engine innobase_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; #ifdef WITH_WSREP -void -wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno) -{ - WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " - "caused by:\n\t" - "1) unsupported configuration options combination, please check documentation.\n\t" - "2) a bug in the code.\n\t" - "3) a database corruption.\n Node consistency compromized, " - "need to abort. Restart the node to resync with cluster.", - (long long)bf_seqno, (long long)victim_seqno); - abort(); -} -/*******************************************************************//** -This function is used to kill one transaction in BF. */ + +/** This function is used to kill one transaction. + +This transaction was open on this node (not-yet-committed), and a +conflicting writeset from some other node that was being applied +caused a locking conflict. First committed (from other node) +wins, thus open transaction is rolled back. BF stands for +brute-force: any transaction can get aborted by galera any time +it is necessary. + +This conflict can happen only when the replicated writeset (from +other node) is being applied, not when it’s waiting in the queue. +If our local transaction reached its COMMIT and this conflicting +writeset was in the queue, then it should fail the local +certification test instead. + +A brute force abort is only triggered by a locking conflict +between a writeset being applied by an applier thread (slave thread) +and an open transaction on the node, not by a Galera writeset +comparison as in the local certification failure. + +@param[in] bf_thd Brute force (BF) thread +@param[in,out] victim_trx Vimtim trx to be killed +@param[in] signal Should victim be signaled */ UNIV_INTERN int wsrep_innobase_kill_one_trx( - void * const bf_thd_ptr, - const trx_t * const bf_trx, + THD* bf_thd, trx_t *victim_trx, - ibool signal) + my_bool signal) { - ut_ad(lock_mutex_own()); - ut_ad(trx_mutex_own(victim_trx)); - ut_ad(bf_thd_ptr); - ut_ad(victim_trx); + ut_ad(victim_trx); DBUG_ENTER("wsrep_innobase_kill_one_trx"); - THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL; - THD *thd = (THD *) victim_trx->mysql_thd; - int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0; - - if (!thd) { - DBUG_PRINT("wsrep", ("no thd for conflicting lock")); - WSREP_WARN("no THD for trx: " TRX_ID_FMT, victim_trx->id); - DBUG_RETURN(1); - } - - if (!bf_thd) { - DBUG_PRINT("wsrep", ("no BF thd for conflicting lock")); - WSREP_WARN("no BF THD for trx: " TRX_ID_FMT, - bf_trx ? bf_trx->id : 0); - DBUG_RETURN(1); - } - WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); + THD *thd= (THD *) victim_trx->mysql_thd; + /* Note that bf_trx might not exists here e.g. on MDL conflict + case. See galera_concurrent_ctas test case */ + trx_t* bf_trx= thd_to_trx(bf_thd); + wsrep_kill_t item; - WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: " - TRX_ID_FMT, - signal, (long long)bf_seqno, - thd_get_thread_id(thd), - victim_trx->id); + ut_ad(bf_thd); + ut_ad(thd); - WSREP_DEBUG("Aborting query: %s conf %d trx: %" PRId64, - (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void", - wsrep_thd_conflict_state(thd, FALSE), - wsrep_thd_ws_handle(thd)->trx_id); - - wsrep_thd_LOCK(thd); - DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock", - { - const char act[]= - "now " - "wait_for signal.wsrep_after_BF_victim_lock"; - DBUG_ASSERT(!debug_sync_set_action(bf_thd, - STRING_WITH_LEN(act))); - };); - - - if (wsrep_thd_query_state(thd) == QUERY_EXITING) { - WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT, - victim_trx->id); - wsrep_thd_UNLOCK(thd); - DBUG_RETURN(0); - } + /* Actual processing of the victim kill is handled later + on background thread. At this point we may not hold + LOCK_thd_data mutex as we are already holding lock sys + and trx mutex. */ - if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) { - WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d", - victim_trx->id, - wsrep_thd_get_conflict_state(thd)); - } + item.victim_thd_id = thd_get_thread_id(thd); + item.victim_trx_id = victim_trx->id; + item.bf_thd_id = thd_get_thread_id(bf_thd); + item.bf_trx_id = bf_trx ? bf_trx->id : TRX_ID_MAX; + item.signal = signal; + item.wait_lock = (victim_trx->lock.wait_lock ? true : false); - switch (wsrep_thd_get_conflict_state(thd)) { - case NO_CONFLICT: - wsrep_thd_set_conflict_state(thd, MUST_ABORT); - break; - case MUST_ABORT: - WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state", - victim_trx->id); - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - DBUG_RETURN(0); - break; - case ABORTED: - case ABORTING: // fall through - default: - WSREP_DEBUG("victim " TRX_ID_FMT " in state %d", - victim_trx->id, wsrep_thd_get_conflict_state(thd)); - wsrep_thd_UNLOCK(thd); - DBUG_RETURN(0); - break; + /* If victim itself is waiting a lock, cancel wait lock. */ + if (victim_trx->lock.wait_lock) { + lock_cancel_waiting_and_release(victim_trx->lock.wait_lock); } - switch (wsrep_thd_query_state(thd)) { - case QUERY_COMMITTING: - enum wsrep_status rcode; - - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT, - victim_trx->id); - - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - } else { - wsrep_t *wsrep= get_wsrep(); - rcode = wsrep->abort_pre_commit( - wsrep, bf_seqno, - (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id - ); - - switch (rcode) { - case WSREP_WARNING: - WSREP_DEBUG("cancel commit warning: " - TRX_ID_FMT, - victim_trx->id); - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - DBUG_RETURN(1); - break; - case WSREP_OK: - break; - default: - WSREP_ERROR( - "cancel commit bad exit: %d " - TRX_ID_FMT, - rcode, - victim_trx->id); - /* unable to interrupt, must abort */ - /* note: kill_mysql() will block, if we cannot. - * kill the lock holder first. - */ - abort(); - break; - } - } - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - break; - case QUERY_EXEC: - /* it is possible that victim trx is itself waiting for some - * other lock. We need to cancel this waiting - */ - WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT, - victim_trx->id); - - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - if (victim_trx->lock.wait_lock) { - WSREP_DEBUG("victim has wait flag: %ld", - thd_get_thread_id(thd)); - lock_t* wait_lock = victim_trx->lock.wait_lock; - if (wait_lock) { - WSREP_DEBUG("canceling wait lock"); - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - lock_cancel_waiting_and_release(wait_lock); - } - - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - } else { - /* abort currently executing query */ - DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - /* Note that innobase_kill_query will take lock_mutex - and trx_mutex */ - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - - /* for BF thd, we need to prevent him from committing */ - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - } - } - break; - case QUERY_IDLE: - { - WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id); - - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - WSREP_DEBUG("kill BF IDLE, seqno: %lld", - (long long)wsrep_thd_trx_seqno(thd)); - wsrep_thd_UNLOCK(thd); - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - DBUG_RETURN(0); - } - /* This will lock thd from proceeding after net_read() */ - wsrep_thd_set_conflict_state(thd, ABORTING); - - wsrep_lock_rollback(); - - if (wsrep_aborting_thd_contains(thd)) { - WSREP_WARN("duplicate thd aborter %lu", - thd_get_thread_id(thd)); - } else { - wsrep_aborting_thd_enqueue(thd); - DBUG_PRINT("wsrep",("enqueuing trx abort for %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("enqueuing trx abort for (%lu)", - thd_get_thread_id(thd)); - } - - DBUG_PRINT("wsrep",("signalling wsrep rollbacker")); - WSREP_DEBUG("signaling aborter"); - wsrep_unlock_rollback(); - wsrep_thd_UNLOCK(thd); - - break; - } - default: - WSREP_WARN("bad wsrep query state: %d", - wsrep_thd_query_state(thd)); - wsrep_thd_UNLOCK(thd); - break; - } + wsrep_enqueue_background_kill(item); DBUG_RETURN(0); } +/** + This function forces the victim transaction to abort. Aborting the + transaction does NOT end it, it still has to be rolled back. + + @param bf_thd brute force THD asking for the abort + @param victim_thd victim THD to be aborted + + @return 0 victim was aborted + @return -1 victim thread was aborted (no transaction) +*/ static int wsrep_abort_transaction( @@ -18784,11 +18625,13 @@ wsrep_abort_transaction( my_bool signal) { DBUG_ENTER("wsrep_innobase_abort_thd"); - + + ut_a(bf_thd); + ut_a(victim_thd); + trx_t* victim_trx = thd_to_trx(victim_thd); - trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL; - WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d", + WSREP_DEBUG("Abort transaction: BF: %s victim: %s victim conf: %d", wsrep_thd_query(bf_thd), wsrep_thd_query(victim_thd), wsrep_thd_conflict_state(victim_thd, FALSE)); @@ -18796,19 +18639,17 @@ wsrep_abort_transaction( if (victim_trx) { lock_mutex_enter(); trx_mutex_enter(victim_trx); - victim_trx->abort_type = TRX_WSREP_ABORT; - int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx, - victim_trx, signal); - trx_mutex_exit(victim_trx); + int rcode = wsrep_innobase_kill_one_trx(bf_thd, + victim_trx, + signal); lock_mutex_exit(); - victim_trx->abort_type = TRX_SERVER_ABORT; + trx_mutex_exit(victim_trx); wsrep_srv_conc_cancel_wait(victim_trx); DBUG_RETURN(rcode); } else { WSREP_DEBUG("victim does not have transaction"); wsrep_thd_LOCK(victim_thd); wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT); - wsrep_thd_UNLOCK(victim_thd); wsrep_thd_awake(victim_thd, signal); } diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h index bcf8a893695..43860424c24 100644 --- a/storage/innobase/include/ha_prototypes.h +++ b/storage/innobase/include/ha_prototypes.h @@ -284,10 +284,9 @@ innobase_casedn_str( #ifdef WITH_WSREP UNIV_INTERN int -wsrep_innobase_kill_one_trx(void * const thd_ptr, - const trx_t * const bf_trx, +wsrep_innobase_kill_one_trx(THD* thd_ptr, trx_t *victim_trx, - ibool signal); + my_bool signal); int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, unsigned char* str, unsigned int str_length, unsigned int buf_length); diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 0c4e40067d1..fda557f9ed0 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -1762,7 +1762,7 @@ wsrep_kill_victim( if (!trx->is_wsrep()) return; my_bool bf_this = wsrep_thd_is_BF(trx->mysql_thd, FALSE); - my_bool bf_other = wsrep_thd_is_BF(lock->trx->mysql_thd, TRUE); + my_bool bf_other = wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE); if ((bf_this && !bf_other) || (bf_this && bf_other && wsrep_trx_order_before( @@ -1807,10 +1807,8 @@ wsrep_kill_victim( } } - lock->trx->abort_type = TRX_WSREP_ABORT; wsrep_innobase_kill_one_trx(trx->mysql_thd, - (const trx_t*) trx, lock->trx, TRUE); - lock->trx->abort_type = TRX_SERVER_ABORT; + lock->trx, true); } } } diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc index c57870915dc..c57723bdba4 100644 --- a/storage/xtradb/handler/ha_innodb.cc +++ b/storage/xtradb/handler/ha_innodb.cc @@ -5510,8 +5510,6 @@ static void innobase_kill_query(handlerton*, THD* thd, enum thd_kill_levels) /* if victim has been signaled by BF thread and/or aborting is already progressing, following query aborting is not necessary any more. - Also, BF thread should own trx mutex for the victim, which would - conflict with trx_mutex_enter() below */ DBUG_VOID_RETURN; } @@ -19519,264 +19517,116 @@ static struct st_mysql_storage_engine innobase_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; #ifdef WITH_WSREP -void -wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno) -{ - WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " - "caused by:\n\t" - "1) unsupported configuration options combination, please check documentation.\n\t" - "2) a bug in the code.\n\t" - "3) a database corruption.\n Node consistency compromized, " - "need to abort. Restart the node to resync with cluster.", - (long long)bf_seqno, (long long)victim_seqno); - abort(); -} -/*******************************************************************//** -This function is used to kill one transaction in BF. */ + +/** This function is used to kill one transaction. + +This transaction was open on this node (not-yet-committed), and a +conflicting writeset from some other node that was being applied +caused a locking conflict. First committed (from other node) +wins, thus open transaction is rolled back. BF stands for +brute-force: any transaction can get aborted by galera any time +it is necessary. + +This conflict can happen only when the replicated writeset (from +other node) is being applied, not when it’s waiting in the queue. +If our local transaction reached its COMMIT and this conflicting +writeset was in the queue, then it should fail the local +certification test instead. + +A brute force abort is only triggered by a locking conflict +between a writeset being applied by an applier thread (slave thread) +and an open transaction on the node, not by a Galera writeset +comparison as in the local certification failure. + +@param[in] bf_thd Brute force (BF) thread +@param[in,out] victim_trx Vimtim trx to be killed +@param[in] signal Should victim be signaled */ UNIV_INTERN int wsrep_innobase_kill_one_trx( - void * const bf_thd_ptr, - const trx_t * const bf_trx, + THD* bf_thd, trx_t *victim_trx, - ibool signal) + my_bool signal) { - ut_ad(lock_mutex_own()); - ut_ad(trx_mutex_own(victim_trx)); - ut_ad(bf_thd_ptr); - ut_ad(victim_trx); + ut_ad(victim_trx); DBUG_ENTER("wsrep_innobase_kill_one_trx"); - THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL; - THD *thd = (THD *) victim_trx->mysql_thd; - int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0; - - if (!thd) { - DBUG_PRINT("wsrep", ("no thd for conflicting lock")); - WSREP_WARN("no THD for trx: " TRX_ID_FMT, victim_trx->id); - DBUG_RETURN(1); - } - if (!bf_thd) { - DBUG_PRINT("wsrep", ("no BF thd for conflicting lock")); - WSREP_WARN("no BF THD for trx: " TRX_ID_FMT, - bf_trx ? bf_trx->id : 0); - DBUG_RETURN(1); - } - - WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); - - WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: " - TRX_ID_FMT, - signal, (long long)bf_seqno, - thd_get_thread_id(thd), - victim_trx->id); + THD *thd= (THD *) victim_trx->mysql_thd; + /* Note that bf_trx might not exists here e.g. on MDL conflict + case. See galera_concurrent_ctas test case */ + trx_t* bf_trx= thd_to_trx(bf_thd); + wsrep_kill_t item; - WSREP_DEBUG("Aborting query: %s", - (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void"); + ut_ad(bf_thd); + ut_ad(thd); - wsrep_thd_LOCK(thd); - DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock", - { - const char act[]= - "now " - "wait_for signal.wsrep_after_BF_victim_lock"; - DBUG_ASSERT(!debug_sync_set_action(bf_thd, - STRING_WITH_LEN(act))); - };); - - - if (wsrep_thd_query_state(thd) == QUERY_EXITING) { - WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT, - victim_trx->id); - wsrep_thd_UNLOCK(thd); - DBUG_RETURN(0); - } + /* Actual processing of the victim kill is handled later + on background thread. At this point we may not hold + LOCK_thd_data mutex as we are already holding lock sys + and trx mutex. */ - if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) { - WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d", - victim_trx->id, - wsrep_thd_get_conflict_state(thd)); - } + item.victim_thd_id = thd_get_thread_id(thd); + item.victim_trx_id = victim_trx->id; + item.bf_thd_id = thd_get_thread_id(bf_thd); + item.bf_trx_id = bf_trx ? bf_trx->id : TRX_ID_MAX; + item.signal = signal; + item.wait_lock = (victim_trx->lock.wait_lock ? true : false); - switch (wsrep_thd_get_conflict_state(thd)) { - case NO_CONFLICT: - wsrep_thd_set_conflict_state(thd, MUST_ABORT); - break; - case MUST_ABORT: - WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state", - victim_trx->id); - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - DBUG_RETURN(0); - break; - case ABORTED: - case ABORTING: // fall through - default: - WSREP_DEBUG("victim " TRX_ID_FMT " in state %d", - victim_trx->id, wsrep_thd_get_conflict_state(thd)); - wsrep_thd_UNLOCK(thd); - DBUG_RETURN(0); - break; + /* If victim itself is waiting a lock, cancel wait lock. */ + if (victim_trx->lock.wait_lock) { + lock_cancel_waiting_and_release(victim_trx->lock.wait_lock); } - switch (wsrep_thd_query_state(thd)) { - case QUERY_COMMITTING: - enum wsrep_status rcode; - - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT, - victim_trx->id); - - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - } else { - wsrep_t *wsrep= get_wsrep(); - rcode = wsrep->abort_pre_commit( - wsrep, bf_seqno, - (wsrep_trx_id_t)victim_trx->id - ); - - switch (rcode) { - case WSREP_WARNING: - WSREP_DEBUG("cancel commit warning: " - TRX_ID_FMT, - victim_trx->id); - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - DBUG_RETURN(1); - break; - case WSREP_OK: - break; - default: - WSREP_ERROR( - "cancel commit bad exit: %d " - TRX_ID_FMT, - rcode, - victim_trx->id); - /* unable to interrupt, must abort */ - /* note: kill_mysql() will block, if we cannot. - * kill the lock holder first. - */ - abort(); - break; - } - } - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - break; - case QUERY_EXEC: - /* it is possible that victim trx is itself waiting for some - * other lock. We need to cancel this waiting - */ - WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT, - victim_trx->id); - - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - if (victim_trx->lock.wait_lock) { - WSREP_DEBUG("victim has wait flag: %ld", - thd_get_thread_id(thd)); - lock_t* wait_lock = victim_trx->lock.wait_lock; - if (wait_lock) { - WSREP_DEBUG("canceling wait lock"); - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - lock_cancel_waiting_and_release(wait_lock); - } - - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - } else { - /* abort currently executing query */ - DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - /* Note that innobase_kill_query will take lock_mutex - and trx_mutex */ - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - - /* for BF thd, we need to prevent him from committing */ - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - } - } - break; - case QUERY_IDLE: - { - WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id); + wsrep_enqueue_background_kill(item); - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - WSREP_DEBUG("kill BF IDLE, seqno: %lld", - (long long)wsrep_thd_trx_seqno(thd)); - wsrep_thd_UNLOCK(thd); - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - DBUG_RETURN(0); - } - /* This will lock thd from proceeding after net_read() */ - wsrep_thd_set_conflict_state(thd, ABORTING); + DBUG_RETURN(0); +} - wsrep_lock_rollback(); +/** + This function forces the victim transaction to abort. Aborting the + transaction does NOT end it, it still has to be rolled back. - if (wsrep_aborting_thd_contains(thd)) { - WSREP_WARN("duplicate thd aborter %lu", - thd_get_thread_id(thd)); - } else { - wsrep_aborting_thd_enqueue(thd); - DBUG_PRINT("wsrep",("enqueuing trx abort for %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("enqueuing trx abort for (%lu)", - thd_get_thread_id(thd)); - } + @param bf_thd brute force THD asking for the abort + @param victim_thd victim THD to be aborted - DBUG_PRINT("wsrep",("signalling wsrep rollbacker")); - WSREP_DEBUG("signaling aborter"); - wsrep_unlock_rollback(); - wsrep_thd_UNLOCK(thd); + @return 0 victim was aborted + @return -1 victim thread was aborted (no transaction) +*/ +static +int +wsrep_abort_transaction( + handlerton* hton, + THD *bf_thd, + THD *victim_thd, + my_bool signal) +{ + DBUG_ENTER("wsrep_innobase_abort_thd"); - break; - } - default: - WSREP_WARN("bad wsrep query state: %d", - wsrep_thd_query_state(thd)); - wsrep_thd_UNLOCK(thd); - break; - } + ut_a(bf_thd); + ut_a(victim_thd); - DBUG_RETURN(0); -} + trx_t* victim_trx = thd_to_trx(victim_thd); -static int -wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd, - my_bool signal) -{ - DBUG_ENTER("wsrep_innobase_abort_thd"); - trx_t* victim_trx = thd_to_trx(victim_thd); - trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL; - WSREP_DEBUG("abort transaction: BF: %s victim: %s", - wsrep_thd_query(bf_thd), - wsrep_thd_query(victim_thd)); + WSREP_DEBUG("Abort transaction: BF: %s victim: %s victim conf: %d", + wsrep_thd_query(bf_thd), + wsrep_thd_query(victim_thd), + wsrep_thd_conflict_state(victim_thd, FALSE)); if (victim_trx) { lock_mutex_enter(); trx_mutex_enter(victim_trx); - victim_trx->abort_type = TRX_WSREP_ABORT; - int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx, - victim_trx, signal); - trx_mutex_exit(victim_trx); + int rcode = wsrep_innobase_kill_one_trx(bf_thd, + victim_trx, + signal); lock_mutex_exit(); - victim_trx->abort_type = TRX_SERVER_ABORT; + trx_mutex_exit(victim_trx); wsrep_srv_conc_cancel_wait(victim_trx); DBUG_RETURN(rcode); } else { WSREP_DEBUG("victim does not have transaction"); wsrep_thd_LOCK(victim_thd); wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT); - wsrep_thd_UNLOCK(victim_thd); wsrep_thd_awake(victim_thd, signal); } diff --git a/storage/xtradb/include/ha_prototypes.h b/storage/xtradb/include/ha_prototypes.h index e5b545e0727..cd5c2dc2f56 100644 --- a/storage/xtradb/include/ha_prototypes.h +++ b/storage/xtradb/include/ha_prototypes.h @@ -303,10 +303,9 @@ innobase_casedn_str( #ifdef WITH_WSREP UNIV_INTERN int -wsrep_innobase_kill_one_trx(void * const thd_ptr, - const trx_t * const bf_trx, +wsrep_innobase_kill_one_trx(THD* thd_ptr, trx_t *victim_trx, - ibool signal); + my_bool signal); int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, unsigned char* str, unsigned int str_length, unsigned int buf_length); diff --git a/storage/xtradb/lock/lock0lock.cc b/storage/xtradb/lock/lock0lock.cc index 4d40111ac20..9f12e006fcb 100644 --- a/storage/xtradb/lock/lock0lock.cc +++ b/storage/xtradb/lock/lock0lock.cc @@ -1773,7 +1773,7 @@ wsrep_kill_victim( if (!trx->is_wsrep()) return; my_bool bf_this = wsrep_thd_is_BF(trx->mysql_thd, FALSE); - my_bool bf_other = wsrep_thd_is_BF(lock->trx->mysql_thd, TRUE); + my_bool bf_other = wsrep_thd_is_BF(lock->trx->mysql_thd, FALSE); if ((bf_this && !bf_other) || (bf_this && bf_other && wsrep_trx_order_before( @@ -1818,10 +1818,8 @@ wsrep_kill_victim( } } - lock->trx->abort_type = TRX_WSREP_ABORT; wsrep_innobase_kill_one_trx(trx->mysql_thd, - (const trx_t*) trx, lock->trx, TRUE); - lock->trx->abort_type = TRX_SERVER_ABORT; + lock->trx, true); } } } |