diff options
35 files changed, 1021 insertions, 342 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h index 923ba57fcdc..9a5f9de3c03 100644 --- a/include/mysql/service_wsrep.h +++ b/include/mysql/service_wsrep.h @@ -70,6 +70,7 @@ struct xid_t; struct wsrep; struct wsrep_ws_handle; struct wsrep_buf; +typedef struct wsrep_kill wsrep_kill_t; extern struct wsrep_service_st { struct wsrep * (*get_wsrep_func)(); @@ -114,7 +115,8 @@ extern struct wsrep_service_st { int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD); void (*wsrep_unlock_rollback_func)(); void (*wsrep_set_data_home_dir_func)(const char *data_dir); - my_bool (*wsrep_thd_is_applier_func)(MYSQL_THD); + my_bool (*wsrep_thd_is_applier_func)(MYSQL_THD thd); + bool (*wsrep_enqueue_background_kill_func)(wsrep_kill_t); } *wsrep_service; #ifdef MYSQL_DYNAMIC_PLUGIN @@ -161,6 +163,7 @@ extern struct wsrep_service_st { #define wsrep_unlock_rollback() wsrep_service->wsrep_unlock_rollback_func() #define wsrep_set_data_home_dir(A) wsrep_service->wsrep_set_data_home_dir_func(A) #define wsrep_thd_is_applier(T) wsrep_service->wsrep_thd_is_applier_func(T) +#define wsrep_enqueue_background_kill(T) wsrep_service->wsrep_enqueue_background_kill_func(T); #define wsrep_debug get_wsrep_debug() #define wsrep_log_conflicts get_wsrep_log_conflicts() @@ -223,6 +226,7 @@ bool wsrep_thd_ignore_table(THD *thd); void wsrep_unlock_rollback(); void wsrep_set_data_home_dir(const char *data_dir); my_bool wsrep_thd_is_applier(MYSQL_THD thd); +bool wsrep_enqueue_background_kill(wsrep_kill_t); #endif #ifdef __cplusplus diff --git a/mysql-test/suite/galera/disabled.def b/mysql-test/suite/galera/disabled.def index b4dd37ea7cb..3929e2e5b9c 100644 --- a/mysql-test/suite/galera/disabled.def +++ b/mysql-test/suite/galera/disabled.def @@ -10,7 +10,6 @@ # ############################################################################## -MW-286 : MDEV-18464 Killing thread can cause mutex deadlock if done concurrently with Galera/replication victim kill MW-328A : MDEV-22666 galera.MW-328A MTR failed: "Semaphore wait has lasted > 600 seconds" and do not release port 16002 MW-328B : MDEV-22666 galera.MW-328A MTR failed: "Semaphore wait has lasted > 600 seconds" and do not release port 16002 MW-329 : MDEV-19962 Galera test failure on MW-329 diff --git a/mysql-test/suite/galera/r/galera_bf_kill.result b/mysql-test/suite/galera/r/galera_bf_kill.result new file mode 100644 index 00000000000..65480ee14f9 --- /dev/null +++ b/mysql-test/suite/galera/r/galera_bf_kill.result @@ -0,0 +1,59 @@ +connection node_2; +CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB; +insert into t1 values (NULL,1); +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a = 5; +connection node_2; +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5; +connection node_2; +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5, b=2; +connection node_1; +ALTER TABLE t1 ADD UNIQUE KEY bcc(b); +select * from t1; +a b +2 1 +ALTER TABLE t1 DROP KEY bcc; +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5, b=2; +connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2b; +begin; +update t1 set a =6, b=7; +connection node_1; +ALTER TABLE t1 ADD UNIQUE KEY bcc(b); +select * from t1; +a b +2 1 +disconnect node_2a; +disconnect node_2b; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +set session wsrep_on=OFF; +begin; +update t1 set a =5, b=2; +connection node_1; +ALTER TABLE t1 DROP KEY bcc; +select * from t1; +a b +2 1 +drop table t1; +disconnect node_2a; diff --git a/mysql-test/suite/galera/r/galera_insert_multi.result b/mysql-test/suite/galera/r/galera_insert_multi.result index 913dd42403a..2fbc9eb50d3 100644 --- a/mysql-test/suite/galera/r/galera_insert_multi.result +++ b/mysql-test/suite/galera/r/galera_insert_multi.result @@ -4,13 +4,13 @@ INSERT INTO t1 VALUES (1),(2); connection node_2; INSERT INTO t1 VALUES (3),(4); connection node_1; -SELECT COUNT(*) = 4 FROM t1; -COUNT(*) = 4 -1 +SELECT COUNT(*) AS EXPECT_4 FROM t1; +EXPECT_4 +4 connection node_2; -SELECT COUNT(*) = 4 FROM t1; -COUNT(*) = 4 -1 +SELECT COUNT(*) AS EXPECT_4 FROM t1; +EXPECT_4 +4 DROP TABLE t1; connection node_2; CREATE TABLE t1 (f1 INTEGER, KEY (f1)) ENGINE=InnoDB; @@ -18,25 +18,25 @@ INSERT INTO t1 VALUES (1),(1); connection node_1; INSERT INTO t1 VALUES (2),(2); connection node_2; -SELECT COUNT(*) = 4 FROM t1; -COUNT(*) = 4 -1 +SELECT COUNT(*) AS EXPECT_4 FROM t1; +EXPECT_4 +4 connection node_1; -SELECT COUNT(*) = 4 FROM t1; -COUNT(*) = 4 -1 +SELECT COUNT(*) AS EXPECT_4 FROM t1; +EXPECT_4 +4 DROP TABLE t1; connection node_1; CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; INSERT INTO t1 VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (1); ERROR 23000: Duplicate entry '1' for key 'PRIMARY' -SELECT COUNT(*) = 0 FROM t1; -COUNT(*) = 0 -1 +SELECT COUNT(*) AS EXPECT_0 FROM t1; +EXPECT_0 +0 connection node_2; -SELECT COUNT(*) = 0 FROM t1; -COUNT(*) = 0 -1 +SELECT COUNT(*) AS EXPECT_0 FROM t1; +EXPECT_0 +0 DROP TABLE t1; connection node_1; CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; @@ -67,11 +67,11 @@ connection node_1; ROLLBACK; connection node_2; COMMIT; -SELECT COUNT(*) = 2 FROM t1; -COUNT(*) = 2 -1 +SELECT COUNT(*) AS EXPECT_2 FROM t1; +EXPECT_2 +2 connection node_1; -SELECT COUNT(*) = 2 FROM t1; -COUNT(*) = 2 -1 +SELECT COUNT(*) AS EXPECT_2 FROM t1; +EXPECT_2 +2 DROP TABLE t1; diff --git a/mysql-test/suite/galera/r/galera_performance_schema.result b/mysql-test/suite/galera/r/galera_performance_schema.result index 5b4994556d6..22b72572759 100644 --- a/mysql-test/suite/galera/r/galera_performance_schema.result +++ b/mysql-test/suite/galera/r/galera_performance_schema.result @@ -4,6 +4,7 @@ FROM threads WHERE name LIKE 'thread/sql/wsrep%' ORDER BY name; name thread/sql/wsrep_applier_thread +name thread/sql/wsrep_killer_thread name thread/sql/wsrep_rollbacker_thread use test; create table t1 (a int not null primary key) engine=innodb; @@ -12,6 +13,7 @@ use performance_schema; select name from mutex_instances where name like 'wait/synch/mutex/sql/LOCK_wsrep%' order by name; name wait/synch/mutex/sql/LOCK_wsrep_config_state name wait/synch/mutex/sql/LOCK_wsrep_desync +name wait/synch/mutex/sql/LOCK_wsrep_kill name wait/synch/mutex/sql/LOCK_wsrep_ready name wait/synch/mutex/sql/LOCK_wsrep_replaying name wait/synch/mutex/sql/LOCK_wsrep_rollback @@ -19,6 +21,7 @@ name wait/synch/mutex/sql/LOCK_wsrep_slave_threads name wait/synch/mutex/sql/LOCK_wsrep_sst name wait/synch/mutex/sql/LOCK_wsrep_sst_init select name from cond_instances where name like 'wait/synch/cond/sql/COND_wsrep%' order by name; +name wait/synch/cond/sql/COND_wsrep_kill name wait/synch/cond/sql/COND_wsrep_ready name wait/synch/cond/sql/COND_wsrep_replaying name wait/synch/cond/sql/COND_wsrep_rollback diff --git a/mysql-test/suite/galera/r/galera_serializable.result b/mysql-test/suite/galera/r/galera_serializable.result index be3f93a081f..8da10ba0954 100644 --- a/mysql-test/suite/galera/r/galera_serializable.result +++ b/mysql-test/suite/galera/r/galera_serializable.result @@ -7,6 +7,8 @@ SELECT * FROM t1; id f2 connection node_2; INSERT INTO t1 VALUES (1,1); +connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1; +connection node_1a; connection node_1; SELECT * FROM t1; ERROR 40001: Deadlock: wsrep aborted transaction @@ -20,6 +22,7 @@ id f2 1 1 connection node_2; UPDATE t1 SET f2 = 2; +connection node_1a; connection node_1; UPDATE t1 SET f2 = 3; ERROR 40001: Deadlock: wsrep aborted transaction @@ -27,11 +30,18 @@ ROLLBACK; DELETE FROM t1; connection node_1; START TRANSACTION; -connection node_1; INSERT INTO t1 VALUES (1,1); connection node_2; INSERT INTO t1 VALUES (1,2); connection node_1; COMMIT; ERROR 40001: Deadlock: wsrep aborted transaction +SELECT * from t1; +id f2 +1 2 +connection node_2; +SELECT * from t1; +id f2 +1 2 DROP TABLE t1; +disconnect node_1a; diff --git a/mysql-test/suite/galera/t/MW-286.cnf b/mysql-test/suite/galera/t/MW-286.cnf new file mode 100644 index 00000000000..8ec4c2333cb --- /dev/null +++ b/mysql-test/suite/galera/t/MW-286.cnf @@ -0,0 +1,10 @@ +!include ../galera_2nodes.cnf + +[mysqld.1] +wsrep-debug=ON +wsrep-log-conflicts=ON + +[mysqld.2] +wsrep-debug=ON +wsrep-log-conflicts=ON + diff --git a/mysql-test/suite/galera/t/MW-286.test b/mysql-test/suite/galera/t/MW-286.test index 426b4493bb7..fcfbf4c62c3 100644 --- a/mysql-test/suite/galera/t/MW-286.test +++ b/mysql-test/suite/galera/t/MW-286.test @@ -3,8 +3,6 @@ # --source include/galera_cluster.inc ---source include/have_innodb.inc ---source include/big_test.inc --connection node_1 CREATE TABLE ten (f1 INTEGER) Engine=InnoDB; diff --git a/mysql-test/suite/galera/t/galera_as_slave_gtid_myisam.test b/mysql-test/suite/galera/t/galera_as_slave_gtid_myisam.test index 940c9c0667d..54468426e68 100644 --- a/mysql-test/suite/galera/t/galera_as_slave_gtid_myisam.test +++ b/mysql-test/suite/galera/t/galera_as_slave_gtid_myisam.test @@ -69,9 +69,13 @@ DROP TABLE t1; reset master; --connection node_2 +--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'; +--source include/wait_condition.inc STOP SLAVE; RESET SLAVE ALL; reset master; --connection node_3 +--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'; +--source include/wait_condition.inc reset master; diff --git a/mysql-test/suite/galera/t/galera_bf_abort_sleep.test b/mysql-test/suite/galera/t/galera_bf_abort_sleep.test index 8d135dc7d42..304e13fafd1 100644 --- a/mysql-test/suite/galera/t/galera_bf_abort_sleep.test +++ b/mysql-test/suite/galera/t/galera_bf_abort_sleep.test @@ -8,8 +8,8 @@ CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; --connection node_2 -SET AUTOCOMMIT=OFF; --let $wsrep_local_bf_aborts_before = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_local_bf_aborts'` +SET AUTOCOMMIT=OFF; INSERT INTO t1 VALUES (1); --send SELECT SLEEP(1000); diff --git a/mysql-test/suite/galera/t/galera_bf_kill.cnf b/mysql-test/suite/galera/t/galera_bf_kill.cnf new file mode 100644 index 00000000000..8ec4c2333cb --- /dev/null +++ b/mysql-test/suite/galera/t/galera_bf_kill.cnf @@ -0,0 +1,10 @@ +!include ../galera_2nodes.cnf + +[mysqld.1] +wsrep-debug=ON +wsrep-log-conflicts=ON + +[mysqld.2] +wsrep-debug=ON +wsrep-log-conflicts=ON + diff --git a/mysql-test/suite/galera/t/galera_bf_kill.test b/mysql-test/suite/galera/t/galera_bf_kill.test new file mode 100644 index 00000000000..095e8bd361a --- /dev/null +++ b/mysql-test/suite/galera/t/galera_bf_kill.test @@ -0,0 +1,171 @@ +--source include/galera_cluster.inc + +# +# Test case 1: Start a transaction on node_2a and kill it +# from other connection on same node +# +--connection node_1 +CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB; +insert into t1 values (NULL,1); + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a = 5; + +--connection node_2 + +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1 +--source include/wait_condition.inc + +--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1` + +--disable_query_log +--eval KILL $k_thread +--enable_query_log + +select * from t1; +--disconnect node_2a +# +# Test case 2: Start a transaction on node_2a and use +# kill query from other connection on same node +# +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5; + +--connection node_2 +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1 +--source include/wait_condition.inc + +--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1` + +--disable_query_log +--eval KILL QUERY $k_thread +--enable_query_log + +select * from t1; +--disconnect node_2a +# +# Test case 3: Start a transaction on node_2a and start a DDL on other node +# that will then abort node_2a transaction +# +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5, b=2; + +--connection node_1 +ALTER TABLE t1 ADD UNIQUE KEY bcc(b); +show create table t1; + +select * from t1; + +ALTER TABLE t1 DROP KEY bcc; +show create table t1; + +--disconnect node_2a +# +# Test case 4: Start a transaction on node_2a and start a DDL on other transaction +# on same node that will then abort node_2a transaction +# +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5, b=2; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY bcc(b); + +select * from t1; +show create table t1; + +ALTER TABLE t1 DROP KEY bcc; +show create table t1; + +--disconnect node_2a +# +# Test case 5: Start a transaction on node_2a and conflicting transaction on node_2b +# and start a DDL on other node that will then abort node_2a and node_2b +# transactions +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5, b=2; + +--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2b +begin; +send update t1 set a =6, b=7; + +--connection node_1 +ALTER TABLE t1 ADD UNIQUE KEY bcc(b); + +select * from t1; + +--disconnect node_2a +--disconnect node_2b +# +# Test case 6: Start a transaction on node_2a and conflicting transaction on node_2b +# and start a DDL on node_2 that will then abort node_2a and node_2b +# transactions +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5, b=2; + +--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2b +begin; +send update t1 set a =6, b=7; + +--connection node_2 +ALTER TABLE t1 DROP KEY bcc; + +select * from t1; +show create table t1; + +--disconnect node_2a +--disconnect node_2b +# +# Test case 7: Start a transaction on node_2a with WSREP_ON=OFF and +# start a DDL on node_2 that will then bf abort node_2a +# +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +set session wsrep_on=OFF; +begin; +update t1 set a =5, b=2; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY bcc(b); + +select * from t1; + +drop table t1; + +--disconnect node_2a +# +# Test case 8: Start a transaction on node_2a with WSREP_ON=OFF and +# start a DDL on node_1 that will then bf abort node_2a +# +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +set session wsrep_on=OFF; +begin; +update t1 set a =5, b=2; + +--connection node_1 +ALTER TABLE t1 DROP KEY bcc; + +select * from t1; +show create table t1; + +drop table t1; + +--disconnect node_2a diff --git a/mysql-test/suite/galera/t/galera_drop_database.cnf b/mysql-test/suite/galera/t/galera_drop_database.cnf new file mode 100644 index 00000000000..8ec4c2333cb --- /dev/null +++ b/mysql-test/suite/galera/t/galera_drop_database.cnf @@ -0,0 +1,10 @@ +!include ../galera_2nodes.cnf + +[mysqld.1] +wsrep-debug=ON +wsrep-log-conflicts=ON + +[mysqld.2] +wsrep-debug=ON +wsrep-log-conflicts=ON + diff --git a/mysql-test/suite/galera/t/galera_insert_multi.test b/mysql-test/suite/galera/t/galera_insert_multi.test index d62283aff69..e225b4bb199 100644 --- a/mysql-test/suite/galera/t/galera_insert_multi.test +++ b/mysql-test/suite/galera/t/galera_insert_multi.test @@ -13,10 +13,16 @@ INSERT INTO t1 VALUES (1),(2); INSERT INTO t1 VALUES (3),(4); --connection node_1 -SELECT COUNT(*) = 4 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 4 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_4 FROM t1; --connection node_2 -SELECT COUNT(*) = 4 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 4 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_4 FROM t1; DROP TABLE t1; @@ -32,10 +38,16 @@ INSERT INTO t1 VALUES (1),(1); INSERT INTO t1 VALUES (2),(2); --connection node_2 -SELECT COUNT(*) = 4 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 4 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_4 FROM t1; --connection node_1 -SELECT COUNT(*) = 4 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 4 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_4 FROM t1; DROP TABLE t1; @@ -49,10 +61,10 @@ CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; --error ER_DUP_ENTRY INSERT INTO t1 VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (1); -SELECT COUNT(*) = 0 FROM t1; +SELECT COUNT(*) AS EXPECT_0 FROM t1; --connection node_2 -SELECT COUNT(*) = 0 FROM t1; +SELECT COUNT(*) AS EXPECT_0 FROM t1; DROP TABLE t1; @@ -107,10 +119,13 @@ ROLLBACK; --connection node_2 COMMIT; -SELECT COUNT(*) = 2 FROM t1; +SELECT COUNT(*) AS EXPECT_2 FROM t1; --connection node_1 -SELECT COUNT(*) = 2 FROM t1; +--let $wait_condition = SELECT COUNT(*) = 2 FROM t1; +--source include/wait_condition.inc + +SELECT COUNT(*) AS EXPECT_2 FROM t1; DROP TABLE t1; diff --git a/mysql-test/suite/galera/t/galera_serializable.test b/mysql-test/suite/galera/t/galera_serializable.test index b12d57fd488..acbf99810a2 100644 --- a/mysql-test/suite/galera/t/galera_serializable.test +++ b/mysql-test/suite/galera/t/galera_serializable.test @@ -7,7 +7,6 @@ # --source include/galera_cluster.inc ---source include/have_innodb.inc --connection node_1 @@ -26,7 +25,12 @@ SELECT * FROM t1; --connection node_2 INSERT INTO t1 VALUES (1,1); ---sleep 2 +--connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1 +--connection node_1a +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--error 0,ER_LOCK_DEADLOCK +--source include/wait_condition.inc + --connection node_1 --error ER_LOCK_DEADLOCK SELECT * FROM t1; @@ -46,7 +50,11 @@ SELECT * FROM t1; --connection node_2 UPDATE t1 SET f2 = 2; ---sleep 2 +--connection node_1a +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--error 0,ER_LOCK_DEADLOCK +--source include/wait_condition.inc + --connection node_1 --error ER_LOCK_DEADLOCK UPDATE t1 SET f2 = 3; @@ -62,8 +70,6 @@ DELETE FROM t1; --connection node_1 START TRANSACTION; - ---connection node_1 INSERT INTO t1 VALUES (1,1); --connection node_2 @@ -73,4 +79,18 @@ INSERT INTO t1 VALUES (1,2); --error ER_LOCK_DEADLOCK COMMIT; +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--source include/wait_condition.inc + +SELECT * from t1; + +--connection node_2 + +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--source include/wait_condition.inc + +SELECT * from t1; + DROP TABLE t1; + +--disconnect node_1a diff --git a/mysql-test/suite/galera/t/galera_unicode_pk.test b/mysql-test/suite/galera/t/galera_unicode_pk.test index 0d571f5cfd7..242d27b2fa7 100644 --- a/mysql-test/suite/galera/t/galera_unicode_pk.test +++ b/mysql-test/suite/galera/t/galera_unicode_pk.test @@ -12,6 +12,9 @@ CREATE TABLE t1 ( INSERT INTO t1 VALUES ('текст'); --connection node_2 +--let $wait_condition = SELECT COUNT(*) = 1 FROM t1; +--source include/wait_condition.inc + SELECT f1 = 'текст' FROM t1; # diff --git a/mysql-test/suite/galera/t/jan.test b/mysql-test/suite/galera/t/jan.test new file mode 100644 index 00000000000..48260749a68 --- /dev/null +++ b/mysql-test/suite/galera/t/jan.test @@ -0,0 +1,20 @@ +--source include/galera_cluster.inc + +--connection node_1 +CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB; +insert into t1 values (NULL,1); + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +set session wsrep_on=OFF; +begin; +update t1 set a =5, b=2; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY bcc(b); + +select * from t1; + +drop table t1; + +--disconnect node_2a diff --git a/mysql-test/suite/wsrep/r/variables.result b/mysql-test/suite/wsrep/r/variables.result index 3abc861f3d0..96dcee47206 100644 --- a/mysql-test/suite/wsrep/r/variables.result +++ b/mysql-test/suite/wsrep/r/variables.result @@ -149,7 +149,7 @@ VARIABLE_VALUE 1 SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_thread_count'; VARIABLE_VALUE -2 +3 SELECT @@global.wsrep_provider; @@global.wsrep_provider libgalera_smm.so @@ -164,7 +164,7 @@ Variable_name Value Threads_connected 1 SHOW STATUS LIKE 'wsrep_thread_count'; Variable_name Value -wsrep_thread_count 2 +wsrep_thread_count 3 SET @wsrep_slave_threads_saved= @@global.wsrep_slave_threads; SET GLOBAL wsrep_slave_threads= 10; @@ -177,7 +177,7 @@ VARIABLE_VALUE 1 SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_thread_count'; VARIABLE_VALUE -11 +12 SHOW STATUS LIKE 'threads_connected'; Variable_name Value Threads_connected 1 diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 0a8c136e556..f3e3489e903 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -652,7 +652,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) wsrep_po_handle(WSREP_PO_INITIALIZER), wsrep_po_cnt(0), wsrep_apply_format(0), - wsrep_ignore_table(false) + wsrep_ignore_table(false), + wsrep_aborter(0) #endif { ulong tmp; @@ -790,6 +791,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) wsrep_replicate_GTID = false; wsrep_skip_wsrep_GTID = false; wsrep_split_flag = false; + wsrep_aborter = 0; #endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); diff --git a/sql/sql_class.h b/sql/sql_class.h index 8d8ab779d56..ed19d0c9d82 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3753,13 +3753,23 @@ public: Resetting killed has to be done under a mutex to ensure its not done during an awake() call. */ +#ifdef WITH_WSREP + mysql_mutex_assert_not_owner(&LOCK_thd_kill); + mysql_mutex_assert_not_owner(&LOCK_thd_data); + mysql_mutex_lock(&LOCK_thd_data); +#endif + mysql_mutex_lock(&LOCK_thd_kill); if (killed != NOT_KILLED) { - mysql_mutex_lock(&LOCK_thd_kill); killed= NOT_KILLED; killed_err= 0; - mysql_mutex_unlock(&LOCK_thd_kill); } + +#ifdef WITH_WSREP + wsrep_aborter= 0; + mysql_mutex_unlock(&LOCK_thd_data); +#endif + mysql_mutex_unlock(&LOCK_thd_kill); } inline void reset_kill_query() { @@ -4421,6 +4431,8 @@ public: #ifdef WITH_WSREP const bool wsrep_applier; /* dedicated slave applier thread */ + bool wsrep_killer; /* dedicated background + kill thread */ bool wsrep_applier_closing; /* applier marked to close */ bool wsrep_client_thread; /* to identify client threads*/ bool wsrep_PA_safe; @@ -4470,6 +4482,8 @@ public: registered again, but replication of last chunk of rows is skipped by the innodb engine: */ bool wsrep_split_flag; + /* thread who has started kill for this THD protected by LOCK_thd_data*/ + my_thread_id wsrep_aborter; #endif /* WITH_WSREP */ /* Handling of timeouts for commands */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 543c877b7f1..a755844e74e 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -8896,8 +8896,15 @@ kill_one_thread(THD *thd, longlong id, killed_state kill_signal, killed_type typ thd->security_ctx->user_matches(tmp->security_ctx)) && !wsrep_thd_is_BF(tmp, false)) { - tmp->awake(kill_signal); - error=0; +#ifdef WITH_WSREP + /* We allow kill to continue only if there is no concurrent + kill in processing. */ + if (wsrep_thd_set_wsrep_aborter(tmp, thd)) +#endif + { + tmp->awake(kill_signal); + error=0; + } } else error= (type == KILL_TYPE_QUERY ? ER_KILL_QUERY_DENIED_ERROR : diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic index 20113444b64..d951f063489 100644 --- a/sql/sql_plugin_services.ic +++ b/sql/sql_plugin_services.ic @@ -184,7 +184,8 @@ static struct wsrep_service_st wsrep_handler = { wsrep_trx_order_before, wsrep_unlock_rollback, wsrep_set_data_home_dir, - wsrep_thd_is_applier + wsrep_thd_is_applier, + wsrep_enqueue_background_kill }; static struct thd_specifics_service_st thd_specifics_handler= diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc index d8ab86c25f2..b7507136fcc 100644 --- a/sql/wsrep_dummy.cc +++ b/sql/wsrep_dummy.cc @@ -16,6 +16,7 @@ #include <my_global.h> #include <sql_class.h> #include <mysql/service_wsrep.h> +#include "wsrep_mysqld.h" my_bool wsrep_thd_is_BF(THD *, my_bool) { return 0; } @@ -154,3 +155,9 @@ void wsrep_log(void (*)(const char *, ...), const char *, ...) my_bool wsrep_thd_is_applier(MYSQL_THD thd) { return false; } + +bool wsrep_enqueue_background_kill(wsrep_kill_t item) +{ return false;} + +bool wsrep_thd_set_wsrep_aborter(THD *victim_thd, THD *bf_thd) +{ return true; } diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index 575c57c5d24..81d6144d6cf 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -140,10 +140,10 @@ void wsrep_post_commit(THD* thd, bool all) => cleanup */ if (thd->wsrep_conflict_state != MUST_REPLAY) - { - WSREP_DEBUG("cleanup transaction for LOCAL_STATE: %s", - WSREP_QUERY(thd)); - } + { + WSREP_DEBUG("cleanup transaction for LOCAL_STATE: %s", + wsrep_thd_query(thd)); + } /* Run post-rollback hook to clean up in the case if some keys were populated for the transaction in provider diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index d392d1c2a61..2feb2030fbe 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -38,7 +38,8 @@ #include <cstdlib> #include "log_event.h" #include "sql_plugin.h" /* wsrep_plugins_pre_init() */ -#include <vector> +#include <list> +#include <algorithm> wsrep_t *wsrep = NULL; /* @@ -133,6 +134,8 @@ mysql_cond_t COND_wsrep_replaying; mysql_mutex_t LOCK_wsrep_slave_threads; mysql_mutex_t LOCK_wsrep_desync; mysql_mutex_t LOCK_wsrep_config_state; +mysql_mutex_t LOCK_wsrep_kill; +mysql_cond_t COND_wsrep_kill; int wsrep_replaying= 0; ulong wsrep_running_threads = 0; // # of currently running wsrep @@ -140,6 +143,7 @@ ulong wsrep_running_threads = 0; // # of currently running wsrep ulong wsrep_running_applier_threads = 0; // # of running applier threads ulong wsrep_running_rollbacker_threads = 0; // # of running // # rollbacker threads +ulong wsrep_running_killer_threads = 0; ulong my_bind_addr; #ifdef HAVE_PSI_INTERFACE @@ -147,11 +151,13 @@ PSI_mutex_key key_LOCK_wsrep_rollback, key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init, key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync, - key_LOCK_wsrep_config_state; + key_LOCK_wsrep_config_state, + key_LOCK_wsrep_kill; PSI_cond_key key_COND_wsrep_rollback, key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, - key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread; + key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread, + key_COND_wsrep_kill; PSI_file_key key_file_wsrep_gra_log; @@ -166,7 +172,8 @@ static PSI_mutex_info wsrep_mutexes[]= { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL}, - { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL} + { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_kill, "LOCK_wsrep_kill", PSI_FLAG_GLOBAL} }; static PSI_cond_info wsrep_conds[]= @@ -176,7 +183,8 @@ static PSI_cond_info wsrep_conds[]= { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0}, { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL}, - { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL} + { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_kill, "COND_wsrep_kill", PSI_FLAG_GLOBAL} }; static PSI_file_info wsrep_files[]= @@ -185,14 +193,15 @@ static PSI_file_info wsrep_files[]= }; PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor, - key_wsrep_rollbacker, key_wsrep_applier; + key_wsrep_rollbacker, key_wsrep_applier, key_wsrep_killer; static PSI_thread_info wsrep_threads[]= { {&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL}, {&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL}, {&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL}, - {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL} + {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}, + {&key_wsrep_killer, "wsrep_killer_thread", PSI_FLAG_GLOBAL} }; #endif /* HAVE_PSI_INTERFACE */ @@ -239,6 +248,7 @@ wsp::Config_state *wsrep_config_state; // if there was no state gap on receiving first view event. static my_bool wsrep_startup = TRUE; +std::list< wsrep_kill_t > wsrep_kill_list; static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { switch (level) { @@ -831,6 +841,8 @@ void wsrep_thr_init() mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_kill, &LOCK_wsrep_kill, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_kill, &COND_wsrep_kill, NULL); DBUG_VOID_RETURN; } @@ -867,6 +879,7 @@ void wsrep_init_startup (bool first) if (!wsrep_start_replication()) unireg_abort(1); wsrep_create_rollbacker(); + wsrep_create_killer(); wsrep_create_appliers(1); if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed @@ -908,6 +921,8 @@ void wsrep_thr_deinit() mysql_mutex_destroy(&LOCK_wsrep_slave_threads); mysql_mutex_destroy(&LOCK_wsrep_desync); mysql_mutex_destroy(&LOCK_wsrep_config_state); + mysql_mutex_destroy(&LOCK_wsrep_kill); + mysql_cond_destroy(&COND_wsrep_kill); delete wsrep_config_state; wsrep_config_state= 0; // Safety } @@ -1666,7 +1681,7 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false) { - WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd)); + WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd)); return 1; } @@ -2162,9 +2177,13 @@ pthread_handler_t start_wsrep_THD(void *arg) case WSREP_ROLLBACKER_THREAD: wsrep_running_rollbacker_threads++; break; + case WSREP_KILLER_THREAD: + wsrep_running_killer_threads++; + thd->wsrep_killer= true; + break; default: WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type); - break; + assert(0); } mysql_cond_broadcast(&COND_thread_count); @@ -2187,9 +2206,13 @@ pthread_handler_t start_wsrep_THD(void *arg) DBUG_ASSERT(wsrep_running_rollbacker_threads > 0); wsrep_running_rollbacker_threads--; break; + case WSREP_KILLER_THREAD: + DBUG_ASSERT(wsrep_running_killer_threads > 0); + wsrep_running_killer_threads--; + break; default: WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type); - break; + assert(0); } my_free(args); @@ -2436,7 +2459,11 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd) } DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); - WSREP_DEBUG("waiting for client connections to close: %u", thread_count); + WSREP_DEBUG("Waiting for client connections to close: %u", thread_count); + WSREP_DEBUG("Waiting for rollbacker threads to close: %lu", wsrep_running_rollbacker_threads); + WSREP_DEBUG("Waiting for applier threads to close: %lu", wsrep_running_applier_threads); + WSREP_DEBUG("Waiting for killer threads to close: %lu", wsrep_running_killer_threads); + WSREP_DEBUG("Waiting for wsrep threads to close: %lu", wsrep_running_threads); while (wait_to_end && have_client_connections()) { @@ -2470,7 +2497,7 @@ void wsrep_close_threads(THD *thd) DBUG_PRINT("quit",("Informing thread %lld that it's time to die", (longlong) tmp->thread_id)); /* We skip slave threads & scheduler on this first loop through. */ - if (tmp->wsrep_applier && tmp != thd) + if ((tmp->wsrep_applier || tmp->wsrep_killer) && tmp != thd) { WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id); wsrep_close_thread (tmp); @@ -2484,7 +2511,7 @@ void wsrep_wait_appliers_close(THD *thd) { /* Wait for wsrep appliers to gracefully exit */ mysql_mutex_lock(&LOCK_thread_count); - while (wsrep_running_threads > 1) + while (wsrep_running_threads > 2) // 1 is for rollbacker thread which needs to be killed explicitly. // This gotta be fixed in a more elegant manner if we gonna have arbitrary // number of non-applier wsrep threads. @@ -2760,11 +2787,11 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) { + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + if (signal) { - mysql_mutex_lock(&thd->LOCK_thd_data); thd->awake(KILL_QUERY); - mysql_mutex_unlock(&thd->LOCK_thd_data); } else { @@ -2772,6 +2799,8 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) mysql_cond_broadcast(&COND_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying); } + + mysql_mutex_unlock(&thd->LOCK_thd_data); } @@ -3029,3 +3058,47 @@ bool wsrep_node_is_synced() { return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false; } + +bool wsrep_enqueue_background_kill(wsrep_kill_t item) +{ + std::list< wsrep_kill_t >::iterator it; + bool inserted= false; + + mysql_mutex_lock(&LOCK_wsrep_kill); + + for (it = wsrep_kill_list.begin(); it != wsrep_kill_list.end(); it++) + { + if ((*it).victim_thd_id == item.victim_thd_id) + break; + } + + if(it != wsrep_kill_list.end()) + { + WSREP_DEBUG("Thread: %lu already on kill list", item.victim_thd_id); + } + else + { + wsrep_kill_list.push_back(item); + mysql_cond_signal(&COND_wsrep_kill); + inserted= true; + } + + mysql_mutex_unlock(&LOCK_wsrep_kill); + return inserted; +} + +bool wsrep_thd_set_wsrep_aborter(THD *victim_thd, THD *bf_thd) +{ + WSREP_DEBUG("wsrep_thd_set_wsrep_aborter called victim: %llu bf: %llu", + victim_thd->thread_id, bf_thd->thread_id); + mysql_mutex_assert_owner(&victim_thd->LOCK_thd_data); + + if (victim_thd->wsrep_aborter && victim_thd->wsrep_aborter != bf_thd->thread_id) + { + return false; + } + + victim_thd->wsrep_aborter = bf_thd->thread_id; + + return true; +} diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index e28b90885b4..61aa0fcd860 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -18,9 +18,20 @@ #ifndef WSREP_MYSQLD_H #define WSREP_MYSQLD_H +#include <my_config.h> +#include <stdint.h> #include <mysql/plugin.h> #include <mysql/service_wsrep.h> +typedef struct wsrep_kill { + unsigned long victim_thd_id; + unsigned long bf_thd_id; + uint64_t victim_trx_id; + uint64_t bf_trx_id; + bool signal; + bool wait_lock; +} wsrep_kill_t; + #ifdef WITH_WSREP typedef struct st_mysql_show_var SHOW_VAR; @@ -92,6 +103,7 @@ extern my_bool wsrep_slave_UK_checks; extern ulong wsrep_running_threads; extern ulong wsrep_running_applier_threads; extern ulong wsrep_running_rollbacker_threads; +extern ulong wsrep_running_killer_threads; extern bool wsrep_new_cluster; extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; @@ -223,8 +235,6 @@ void wsrep_log(void (*fun)(const char *, ...), const char *format, ...); #define WSREP_PROVIDER_EXISTS \ (wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN)) -#define WSREP_QUERY(thd) (thd->query()) - extern my_bool wsrep_ready_get(); extern void wsrep_ready_wait(); @@ -254,6 +264,8 @@ extern mysql_cond_t COND_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_mutex_t LOCK_wsrep_desync; extern mysql_mutex_t LOCK_wsrep_config_state; +extern mysql_mutex_t LOCK_wsrep_kill; +extern mysql_cond_t COND_wsrep_kill; extern wsrep_aborting_thd_t wsrep_aborting_thd; extern my_bool wsrep_emulate_bin_log; extern int wsrep_to_isolation; @@ -278,6 +290,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying; extern PSI_cond_key key_COND_wsrep_replaying; extern PSI_mutex_key key_LOCK_wsrep_slave_threads; extern PSI_mutex_key key_LOCK_wsrep_desync; +extern PSI_mutex_key key_LOCK_wsrep_kill; +extern PSI_cond_key key_COND_wsrep_kill; extern PSI_file_key key_file_wsrep_gra_log; @@ -285,6 +299,7 @@ extern PSI_thread_key key_wsrep_sst_joiner; extern PSI_thread_key key_wsrep_sst_donor; extern PSI_thread_key key_wsrep_rollbacker; extern PSI_thread_key key_wsrep_applier; +extern PSI_thread_key key_wsrep_killer; #endif /* HAVE_PSI_INTERFACE */ @@ -311,7 +326,8 @@ void thd_binlog_trx_reset(THD * thd); enum wsrep_thread_type { WSREP_APPLIER_THREAD=1, - WSREP_ROLLBACKER_THREAD=2 + WSREP_ROLLBACKER_THREAD=2, + WSREP_KILLER_THREAD=3 }; typedef void (*wsrep_thd_processor_fun)(THD *); @@ -358,6 +374,8 @@ void wsrep_keys_free(wsrep_key_arr_t* key_arr); ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \ wsrep_forced_binlog_format : my_format) +bool wsrep_thd_set_wsrep_aborter(THD *victim_thd, THD *bf_thd); + #else /* WITH_WSREP */ #define WSREP(T) (0) diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 1e60088c5f1..63cef835f89 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -22,11 +22,16 @@ //#include "global_threads.h" // LOCK_thread_count, etc. #include "sql_base.h" // close_thread_tables() #include "mysqld.h" // start_wsrep_THD(); +#include "sql_show.h" // find_thread_by_id #include "slave.h" // opt_log_slave_updates #include "rpl_filter.h" #include "rpl_rli.h" #include "rpl_mi.h" +#include "debug_sync.h" +#include <list> + +extern std::list< wsrep_kill_t > wsrep_kill_list; #if (__LP64__) static volatile int64 wsrep_bf_aborts_counter(0); @@ -279,7 +284,7 @@ void wsrep_replay_sp_transaction(THD* thd) WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s", rcode, (thd->db ? thd->db : "(null)"), - WSREP_QUERY(thd)); + wsrep_thd_query(thd)); /* we're now in inconsistent state, must abort */ mysql_mutex_unlock(&thd->LOCK_thd_data); unireg_abort(1); @@ -529,12 +534,27 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock) mysql_mutex_lock(&LOCK_thread_count); ulong old_wsrep_running_threads= wsrep_running_threads; + PSI_thread_key key; - DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD || - args->thread_type == WSREP_ROLLBACKER_THREAD); +#ifdef HAVE_PSI_INTERFACE + switch(args->thread_type) + { + case WSREP_APPLIER_THREAD: + key = key_wsrep_applier; + break; + case WSREP_ROLLBACKER_THREAD: + key = key_wsrep_rollbacker; + break; + case WSREP_KILLER_THREAD: + key = key_wsrep_killer; + break; + default: + WSREP_ERROR("Incorrect thread type %d", args->thread_type); + assert(0); + } +#endif /* HAVE_PSI_INTERFACE */ - bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD - ? key_wsrep_applier : key_wsrep_rollbacker, + bool res= mysql_thread_create(key, &args->thread_id, &connection_attrib, start_wsrep_THD, (void*)args); @@ -562,6 +582,360 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock) return res; } +static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno) +{ + WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " + "caused by:\n\t" + "1) unsupported configuration options combination, please check documentation.\n\t" + "2) a bug in the code.\n\t" + "3) a database corruption.\n Node consistency compromized, " + "need to abort. Restart the node to resync with cluster.", + bf_seqno, victim_seqno); + abort(); +} + +static int wsrep_kill(wsrep_kill_t* item) +{ + bool signal= item->signal; + unsigned long long victim_trx_id= static_cast<unsigned long long>(item->victim_trx_id); + unsigned long long bf_trx_id= static_cast<unsigned long long>(item->bf_trx_id); + + // Note that find_thread_by_id will acquire LOCK_thd_data mutex + // for thd if it's found + THD* bf_thd= find_thread_by_id(item->bf_thd_id, false); + + if (!bf_thd) + { + WSREP_ERROR("BF thread: %lu not found", item->bf_thd_id); + return (0); + } + + long long bf_seqno= wsrep_thd_trx_seqno(bf_thd); + + WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s query: %s", + wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal", + bf_trx_id, + item->bf_thd_id, + bf_seqno, + wsrep_thd_query_state_str(bf_thd), + wsrep_thd_conflict_state_str(bf_thd), + wsrep_thd_query(bf_thd)); + + // Note that we need to release LOCK_thd_data mutex from BF thread + // to obey safe mutex ordering of LOCK_thread_count -> LOCK_thd_data + // that both are taken on find_thread_by_id + wsrep_thd_UNLOCK(bf_thd); + + THD* thd= find_thread_by_id(item->victim_thd_id, false); + + if (!thd) + { + WSREP_DEBUG("Victim thread: %lu not found", item->victim_thd_id); + return (0); + } + + if (!wsrep_thd_set_wsrep_aborter(thd, bf_thd)) + { + WSREP_DEBUG("Vimtim %s kill already in process for victim: %llu" + " bf: %llu query_state: %s conflict_state: %s query: %s", + wsrep_thd_is_BF(thd, false) ? "BF" : "normal", + thd->thread_id, + bf_thd->thread_id, + wsrep_thd_query_state_str(thd), + wsrep_thd_conflict_state_str(thd), + wsrep_thd_query(thd)); + + wsrep_thd_UNLOCK(thd); + return(0); + } + + WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); + + unsigned long victim_thread= item->victim_thd_id; + long long victim_seqno= wsrep_thd_trx_seqno(thd); + + WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s query: %s", + wsrep_thd_is_BF(thd, false) ? "BF" : "normal", + victim_trx_id, + victim_thread, + victim_seqno, + wsrep_thd_query_state_str(thd), + wsrep_thd_conflict_state_str(thd), + wsrep_thd_query(thd)); + + if (wsrep_thd_query_state(thd) == QUERY_EXITING) + { + WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu" + " thread: %lu", + victim_trx_id, + victim_thread); + wsrep_thd_UNLOCK(thd); + return(0); + } + + if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) + { + WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu " + ", thread: %lu exec_mode: %s", + victim_trx_id, + victim_thread, + wsrep_thd_exec_mode_str(thd)); + } + + switch (wsrep_thd_get_conflict_state(thd)) + { + case NO_CONFLICT: + WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state", + victim_thread, + victim_trx_id); + wsrep_thd_set_conflict_state(thd, MUST_ABORT); + break; + case MUST_ABORT: + WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state", + victim_thread, + victim_trx_id); + wsrep_thd_awake(thd, signal); + return(0); + break; + case ABORTED: + case ABORTING: // fall through + default: + WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_conflict_state_str(thd)); + wsrep_thd_UNLOCK(thd); + return(0); + break; + } + + switch (wsrep_thd_query_state(thd)) + { + case QUERY_COMMITTING: + { + enum wsrep_status rcode=WSREP_OK; + + WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + } + else + { + wsrep_t *wsrep= get_wsrep(); + + rcode= wsrep->abort_pre_commit(wsrep, bf_seqno, + (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id); + + switch (rcode) + { + case WSREP_WARNING: + { + WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + return(1); + break; + } + case WSREP_OK: + break; + default: + { + WSREP_ERROR("Victim cancel commit bad commit exit thread: " + "%lu trx: %llu rcode: %d ", + victim_thread, + victim_trx_id, + rcode); + /* unable to interrupt, must abort */ + /* note: kill_mysql() will block, if we cannot. + * kill the lock holder first. */ + abort(); + break; + } + } + } + + wsrep_thd_awake(thd, signal); + break; + } + case QUERY_EXEC: + { + /* it is possible that victim trx is itself waiting for some + * other lock. We need to cancel this waiting */ + WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu", + victim_thread, victim_trx_id); + + bool wait_lock= item->wait_lock; + + if (wait_lock) + { + WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + } + else + { + /* Abort currently executing query */ + WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + + wsrep_thd_awake(thd, signal); + + /* for BF thd, we need to prevent him from committing */ + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave for thread: " + "%lu trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + } + } + break; + } + case QUERY_IDLE: + { + WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: " + "%llu bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_thd_UNLOCK(thd); + wsrep_abort_slave_trx(bf_seqno, victim_seqno); + return(0); + } + + /* This will lock thd from proceeding after net_read() */ + wsrep_thd_set_conflict_state(thd, ABORTING); + + wsrep_lock_rollback(); + + if (wsrep_aborting_thd_contains(thd)) + { + WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + } + else + { + wsrep_aborting_thd_enqueue(thd); + WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort", + victim_thread, + victim_trx_id); + } + + wsrep_unlock_rollback(); + wsrep_thd_UNLOCK(thd); + + break; + } + default: + { + WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_query_state_str(thd)); + + wsrep_thd_UNLOCK(thd); + break; + } + } + + return(0); +} + +static void wsrep_process_kill(THD *thd) +{ + DBUG_ENTER("wsrep_process_kill"); + + mysql_mutex_lock(&LOCK_wsrep_kill); + + WSREP_DEBUG("WSREP killer thread started"); + + while (thd->killed == NOT_KILLED) + { + thd_proc_info(thd, "wsrep killer idle"); + thd->mysys_var->current_mutex= &LOCK_wsrep_kill; + thd->mysys_var->current_cond= &COND_wsrep_kill; + + mysql_cond_wait(&COND_wsrep_kill,&LOCK_wsrep_kill); + + WSREP_DEBUG("WSREP killer thread wakes for signal"); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd_proc_info(thd, "wsrep killer active"); + thd->mysys_var->current_mutex= 0; + thd->mysys_var->current_cond= 0; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + /* process all entries in the queue */ + while (!wsrep_kill_list.empty()) + { + wsrep_kill_t to_be_killed= wsrep_kill_list.front(); + // Release list mutex while we kill one thread + mysql_mutex_unlock(&LOCK_wsrep_kill); + wsrep_kill(&to_be_killed); + mysql_mutex_lock(&LOCK_wsrep_kill); + wsrep_kill_list.pop_front(); + } + } + + assert(wsrep_kill_list.empty()); + + mysql_mutex_unlock(&LOCK_wsrep_kill); + sql_print_information("WSREP: killer thread exiting"); + DBUG_PRINT("wsrep",("wsrep killer thread exiting")); + DBUG_VOID_RETURN; +} + + +void wsrep_create_killer() +{ + wsrep_thread_args* arg; + if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) { + WSREP_ERROR("Can't allocate memory for wsrep background killer thread"); + assert(0); + } + + arg->thread_type = WSREP_KILLER_THREAD; + arg->processor = wsrep_process_kill; + + if (create_wsrep_THD(arg, false)) { + WSREP_WARN("Can't create thread to manage wsrep background kill"); + my_free(arg); + return; + } +} + bool wsrep_create_appliers(long threads, bool thread_count_lock) { if (!wsrep_connected) @@ -657,7 +1031,7 @@ static void wsrep_rollback_process(THD *thd) mysql_mutex_unlock(&aborting->LOCK_thd_data); - set_current_thd(aborting); + set_current_thd(aborting); aborting->store_globals(); mysql_mutex_lock(&aborting->LOCK_thd_data); @@ -667,7 +1041,7 @@ static void wsrep_rollback_process(THD *thd) (longlong) aborting->real_id); mysql_mutex_unlock(&aborting->LOCK_thd_data); - set_current_thd(thd); + set_current_thd(thd); thd->store_globals(); mysql_mutex_lock(&LOCK_wsrep_rollback); diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h index 10efcbefbf6..51700d31e63 100644 --- a/sql/wsrep_thd.h +++ b/sql/wsrep_thd.h @@ -29,6 +29,7 @@ void wsrep_replay_sp_transaction(THD* thd); void wsrep_replay_transaction(THD *thd); bool wsrep_create_appliers(long threads, bool thread_count_lock=false); void wsrep_create_rollbacker(); +void wsrep_create_killer(); int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal); diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index 9777cc6ec62..8b2b7c13cdc 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -500,6 +500,7 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type) if (wsrep_start_replication()) { wsrep_create_rollbacker(); + wsrep_create_killer(); WSREP_DEBUG("Cluster address update creating %ld applier threads running %lu", wsrep_slave_threads, wsrep_running_applier_threads); wsrep_create_appliers(wsrep_slave_threads); diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 3b404928cf4..f2e5106478d 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -4772,6 +4772,7 @@ innobase_rollback_trx( trx->will_lock = 0; #ifdef WITH_WSREP trx->wsrep = false; + trx->lock.was_chosen_as_wsrep_victim= false; #endif DBUG_RETURN(0); } @@ -5148,8 +5149,6 @@ static void innobase_kill_query(handlerton*, THD* thd, enum thd_kill_levels) /* if victim has been signaled by BF thread and/or aborting is already progressing, following query aborting is not necessary any more. - Also, BF thread should own trx mutex for the victim, which would - conflict with trx_mutex_enter() below */ DBUG_VOID_RETURN; } @@ -5185,8 +5184,9 @@ static void innobase_kill_query(handlerton*, THD* thd, enum thd_kill_levels) !trx->lock.was_chosen_as_deadlock_victim; trx_sys_mutex_exit(); if (!cancel); - else if (lock_t *lock= trx->lock.wait_lock) + else if (lock_t *lock= trx->lock.wait_lock) { lock_cancel_waiting_and_release(lock); + } lock_mutex_exit(); trx_mutex_exit(trx); } @@ -19522,248 +19522,88 @@ static struct st_mysql_storage_engine innobase_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; #ifdef WITH_WSREP -void -wsrep_abort_slave_trx( -/*==================*/ - wsrep_seqno_t bf_seqno, - wsrep_seqno_t victim_seqno) -{ - WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " - "caused by:\n\t" - "1) unsupported configuration options combination, please check documentation.\n\t" - "2) a bug in the code.\n\t" - "3) a database corruption.\n Node consistency compromized, " - "need to abort. Restart the node to resync with cluster.", - (long long)bf_seqno, (long long)victim_seqno); - abort(); -} -/*******************************************************************//** -This function is used to kill one transaction in BF. */ + +/** This function is used to kill one transaction. + +This transaction was open on this node (not-yet-committed), and a +conflicting writeset from some other node that was being applied +caused a locking conflict. First committed (from other node) +wins, thus open transaction is rolled back. BF stands for +brute-force: any transaction can get aborted by galera any time +it is necessary. + +This conflict can happen only when the replicated writeset (from +other node) is being applied, not when it’s waiting in the queue. +If our local transaction reached its COMMIT and this conflicting +writeset was in the queue, then it should fail the local +certification test instead. + +A brute force abort is only triggered by a locking conflict +between a writeset being applied by an applier thread (slave thread) +and an open transaction on the node, not by a Galera writeset +comparison as in the local certification failure. + +@param[in] bf_thd Brute force (BF) thread +@param[in,out] victim_trx Vimtim trx to be killed +@param[in] signal Should victim be signaled */ UNIV_INTERN int wsrep_innobase_kill_one_trx( -/*========================*/ - void * const bf_thd_ptr, - const trx_t * const bf_trx, + THD* bf_thd, trx_t *victim_trx, - ibool signal) + my_bool signal) { - ut_ad(lock_mutex_own()); - ut_ad(trx_mutex_own(victim_trx)); - ut_ad(bf_thd_ptr); - ut_ad(victim_trx); + ut_ad(victim_trx); + ut_ad(lock_mutex_own()); + ut_ad(trx_mutex_own(victim_trx)); DBUG_ENTER("wsrep_innobase_kill_one_trx"); - THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL; - THD *thd = (THD *) victim_trx->mysql_thd; - int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0; - - if (!thd) { - DBUG_PRINT("wsrep", ("no thd for conflicting lock")); - WSREP_WARN("no THD for trx: " TRX_ID_FMT, victim_trx->id); - DBUG_RETURN(1); - } - - if (!bf_thd) { - DBUG_PRINT("wsrep", ("no BF thd for conflicting lock")); - WSREP_WARN("no BF THD for trx: " TRX_ID_FMT, - bf_trx ? bf_trx->id : 0); - DBUG_RETURN(1); - } - - WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); - - WSREP_DEBUG("BF kill (" ULINTPF ", seqno: " INT64PF - "), victim: (%lu) trx: " TRX_ID_FMT, - signal, bf_seqno, - thd_get_thread_id(thd), - victim_trx->id); - WSREP_DEBUG("Aborting query: %s conf %d trx: %" PRId64, - (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void", - wsrep_thd_conflict_state(thd, FALSE), - wsrep_thd_ws_handle(thd)->trx_id); + THD *thd= (THD *) victim_trx->mysql_thd; + /* Note that bf_trx might not exists here e.g. on MDL conflict + case. See galera_concurrent_ctas test case */ + trx_t* bf_trx= thd_to_trx(bf_thd); + wsrep_kill_t item; - wsrep_thd_LOCK(thd); - DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock", - { - const char act[]= - "now " - "wait_for signal.wsrep_after_BF_victim_lock"; - DBUG_ASSERT(!debug_sync_set_action(bf_thd, - STRING_WITH_LEN(act))); - };); - - - if (wsrep_thd_query_state(thd) == QUERY_EXITING) { - WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT, - victim_trx->id); - wsrep_thd_UNLOCK(thd); - DBUG_RETURN(0); - } + ut_ad(bf_thd); + ut_ad(thd); - if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) { - WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d", - victim_trx->id, - wsrep_thd_get_conflict_state(thd)); - } + victim_trx->lock.was_chosen_as_wsrep_victim= true; + victim_trx->lock.was_chosen_as_deadlock_victim= false; + item.victim_thd_id = thd_get_thread_id(thd); + item.victim_trx_id = victim_trx->id; + item.bf_thd_id = thd_get_thread_id(bf_thd); + item.bf_trx_id = bf_trx ? bf_trx->id : TRX_ID_MAX; + item.signal = signal; + item.wait_lock = (victim_trx->lock.wait_lock ? true : false); - switch (wsrep_thd_get_conflict_state(thd)) { - case NO_CONFLICT: - wsrep_thd_set_conflict_state(thd, MUST_ABORT); - break; - case MUST_ABORT: - WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state", - victim_trx->id); - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - DBUG_RETURN(0); - break; - case ABORTED: - case ABORTING: // fall through - default: - WSREP_DEBUG("victim " TRX_ID_FMT " in state %d", - victim_trx->id, wsrep_thd_get_conflict_state(thd)); - wsrep_thd_UNLOCK(thd); - DBUG_RETURN(0); - break; + /* If victim itself is waiting a lock, cancel wait lock. */ + if (victim_trx->lock.wait_lock) { + lock_cancel_waiting_and_release(victim_trx->lock.wait_lock); } - switch (wsrep_thd_query_state(thd)) { - case QUERY_COMMITTING: - enum wsrep_status rcode; - - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT, - victim_trx->id); - - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - } else { - wsrep_t *wsrep= get_wsrep(); - rcode = wsrep->abort_pre_commit( - wsrep, bf_seqno, - (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id - ); - - switch (rcode) { - case WSREP_WARNING: - WSREP_DEBUG("cancel commit warning: " - TRX_ID_FMT, - victim_trx->id); - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - DBUG_RETURN(1); - break; - case WSREP_OK: - break; - default: - WSREP_ERROR( - "cancel commit bad exit: %d " - TRX_ID_FMT, - rcode, victim_trx->id); - /* unable to interrupt, must abort */ - /* note: kill_mysql() will block, if we cannot. - * kill the lock holder first. - */ - abort(); - break; - } - } - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - break; - case QUERY_EXEC: - /* it is possible that victim trx is itself waiting for some - * other lock. We need to cancel this waiting - */ - WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT, - victim_trx->id); - - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - - if (victim_trx->lock.wait_lock) { - WSREP_DEBUG("victim has wait flag: %ld", - thd_get_thread_id(thd)); - lock_t* wait_lock = victim_trx->lock.wait_lock; - - if (wait_lock) { - WSREP_DEBUG("canceling wait lock"); - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - lock_cancel_waiting_and_release(wait_lock); - } - - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - } else { - /* abort currently executing query */ - DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - /* Note that innobase_kill_query will take lock_mutex - and trx_mutex */ - wsrep_thd_UNLOCK(thd); - wsrep_thd_awake(thd, signal); - - /* for BF thd, we need to prevent him from committing */ - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - } - } - break; - case QUERY_IDLE: - { - WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id); - - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - WSREP_DEBUG("kill BF IDLE, seqno: %lld", - (long long)wsrep_thd_trx_seqno(thd)); - wsrep_thd_UNLOCK(thd); - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); - DBUG_RETURN(0); - } - /* This will lock thd from proceeding after net_read() */ - wsrep_thd_set_conflict_state(thd, ABORTING); - - wsrep_lock_rollback(); - - if (wsrep_aborting_thd_contains(thd)) { - WSREP_WARN("duplicate thd aborter %lu", - (ulong) thd_get_thread_id(thd)); - } else { - wsrep_aborting_thd_enqueue(thd); - DBUG_PRINT("wsrep",("enqueuing trx abort for %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("enqueuing trx abort for (%lu)", - thd_get_thread_id(thd)); - } - - DBUG_PRINT("wsrep",("signalling wsrep rollbacker")); - WSREP_DEBUG("signaling aborter"); - wsrep_unlock_rollback(); - wsrep_thd_UNLOCK(thd); - - break; - } - default: - WSREP_WARN("bad wsrep query state: %d", - wsrep_thd_query_state(thd)); - wsrep_thd_UNLOCK(thd); - break; - } + /* Actual processing of the victim kill is handled later + on background thread. At this point we may not hold + LOCK_thd_data mutex as we are already holding lock sys + and trx mutex. */ + (void)wsrep_enqueue_background_kill(item); DBUG_RETURN(0); } +/** + This function forces the victim transaction to abort. Aborting the + transaction does NOT end it, it still has to be rolled back. + + @param bf_thd brute force THD asking for the abort + @param victim_thd victim THD to be aborted + + @return 0 victim was aborted + @return -1 victim thread was aborted (no transaction) +*/ static int wsrep_abort_transaction( -/*====================*/ handlerton* hton, THD *bf_thd, THD *victim_thd, @@ -19771,10 +19611,12 @@ wsrep_abort_transaction( { DBUG_ENTER("wsrep_innobase_abort_thd"); + ut_a(bf_thd); + ut_a(victim_thd); + trx_t* victim_trx = thd_to_trx(victim_thd); - trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL; - WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d", + WSREP_DEBUG("Abort transaction: BF: %s victim: %s victim conf: %d", wsrep_thd_query(bf_thd), wsrep_thd_query(victim_thd), wsrep_thd_conflict_state(victim_thd, FALSE)); @@ -19782,8 +19624,9 @@ wsrep_abort_transaction( if (victim_trx) { lock_mutex_enter(); trx_mutex_enter(victim_trx); - int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx, - victim_trx, signal); + int rcode = wsrep_innobase_kill_one_trx(bf_thd, + victim_trx, + signal); lock_mutex_exit(); trx_mutex_exit(victim_trx); wsrep_srv_conc_cancel_wait(victim_trx); @@ -19792,7 +19635,6 @@ wsrep_abort_transaction( WSREP_DEBUG("victim does not have transaction"); wsrep_thd_LOCK(victim_thd); wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT); - wsrep_thd_UNLOCK(victim_thd); wsrep_thd_awake(victim_thd, signal); } diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h index 05dc3f57df7..3954b866622 100644 --- a/storage/innobase/include/ha_prototypes.h +++ b/storage/innobase/include/ha_prototypes.h @@ -235,10 +235,9 @@ innobase_casedn_str( #ifdef WITH_WSREP UNIV_INTERN int -wsrep_innobase_kill_one_trx(void * const thd_ptr, - const trx_t * const bf_trx, +wsrep_innobase_kill_one_trx(THD* bf_thd, trx_t *victim_trx, - ibool signal); + my_bool signal); int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, unsigned char* str, unsigned int str_length, unsigned int buf_length); diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 4161d4d8563..7a78208569f 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -34,6 +34,7 @@ Created 3/26/1996 Heikki Tuuri #include "trx0xa.h" #include "ut0vec.h" #include "fts0fts.h" +#include "read0read.h" #include <vector> #include <set> @@ -543,28 +544,11 @@ Check transaction state */ ut_error; \ } while (0) -/** Check if transaction is free so that it can be re-initialized. -@param t transaction handle */ -#define assert_trx_is_free(t) do { \ - ut_ad(trx_state_eq((t), TRX_STATE_NOT_STARTED)); \ - ut_ad(!(t)->id); \ - ut_ad(!(t)->has_logged()); \ - ut_ad(!(t)->is_referenced()); \ - ut_ad(!(t)->is_wsrep()); \ - ut_ad(!MVCC::is_view_active((t)->read_view)); \ - ut_ad((t)->lock.wait_thr == NULL); \ - ut_ad(UT_LIST_GET_LEN((t)->lock.trx_locks) == 0); \ - ut_ad((t)->lock.table_locks.empty()); \ - ut_ad(!(t)->autoinc_locks \ - || ib_vector_is_empty((t)->autoinc_locks)); \ - ut_ad((t)->dict_operation == TRX_DICT_OP_NONE); \ -} while(0) - /** Check if transaction is in-active so that it can be freed and put back to transaction pool. @param t transaction handle */ #define assert_trx_is_inactive(t) do { \ - assert_trx_is_free((t)); \ + t->assert_freed(); \ ut_ad((t)->dict_operation_lock_mode == 0); \ } while(0) @@ -656,6 +640,11 @@ struct trx_lock_t { lock_sys->mutex. Otherwise, this may only be modified by the thread that is serving the running transaction. */ +#ifdef WITH_WSREP + bool was_chosen_as_wsrep_victim; + /*!< high priority wsrep thread has + marked this trx to abort */ +#endif /* WITH_WSREP */ /** Pre-allocated record locks */ struct { @@ -1205,7 +1194,25 @@ public: /** Free the memory to trx_pools */ inline void free(); - + /** Check if transaction is free so that it can be re-initialized. */ + void assert_freed() + { + ut_ad(state == TRX_STATE_NOT_STARTED); + ut_ad(!id); + ut_ad(!has_logged()); + ut_ad(!is_referenced()); + ut_ad(!is_wsrep()); +#ifdef WITH_WSREP + ut_ad(!lock.was_chosen_as_wsrep_victim); +#endif + ut_ad(!MVCC::is_view_active(read_view)); + ut_ad(lock.wait_thr == NULL); + ut_ad(UT_LIST_GET_LEN(lock.trx_locks) == 0); + ut_ad(lock.table_locks.empty()); + ut_ad(!autoinc_locks + || ib_vector_is_empty(autoinc_locks)); + ut_ad(dict_operation == TRX_DICT_OP_NONE); + } private: /** Assign a rollback segment for modifying temporary tables. @return the assigned rollback segment */ diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 4730ae53133..acfe9b69ac5 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -1149,7 +1149,7 @@ wsrep_kill_victim( } wsrep_innobase_kill_one_trx(trx->mysql_thd, - trx, lock->trx, TRUE); + lock->trx, true); } } } diff --git a/storage/innobase/trx/trx0roll.cc b/storage/innobase/trx/trx0roll.cc index c5f70452bf2..bbec99e9135 100644 --- a/storage/innobase/trx/trx0roll.cc +++ b/storage/innobase/trx/trx0roll.cc @@ -191,6 +191,7 @@ dberr_t trx_rollback_for_mysql(trx_t* trx) ut_ad(trx->in_mysql_trx_list); #ifdef WITH_WSREP trx->wsrep = false; + trx->lock.was_chosen_as_wsrep_victim= false; #endif return(DB_SUCCESS); @@ -412,9 +413,7 @@ trx_rollback_to_savepoint_for_mysql_low( trx->op_info = ""; #ifdef WITH_WSREP - if (trx->is_wsrep()) { - trx->lock.was_chosen_as_deadlock_victim = false; - } + trx->lock.was_chosen_as_wsrep_victim= false; #endif return(err); } diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 13b4efb973b..7b6102e4309 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -330,8 +330,7 @@ trx_t *trx_allocate_for_background() MEM_CHECK_DEFINED() in trx_t::free(). */ MEM_MAKE_DEFINED(trx, sizeof *trx); #endif - - assert_trx_is_free(trx); + trx->assert_freed(); mem_heap_t* heap; ib_alloc_t* alloc; @@ -1857,15 +1856,14 @@ trx_commit_in_memory( trx->state = TRX_STATE_NOT_STARTED; #ifdef WITH_WSREP trx->wsrep = false; + trx->lock.was_chosen_as_wsrep_victim= false; #endif /* trx->in_mysql_trx_list would hold between trx_allocate_for_mysql() and trx_free_for_mysql(). It does not hold for recovered transactions or system transactions. */ - assert_trx_is_free(trx); - + trx->assert_freed(); trx_init(trx); - trx_mutex_exit(trx); ut_a(trx->error_state == DB_SUCCESS); |