diff options
-rw-r--r-- | include/mysql/service_wsrep.h | 8 | ||||
-rw-r--r-- | mysql-test/suite/galera/r/galera_bf_kill.result | 120 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/MW-388.test | 2 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/MW-86-wait1.test | 1 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/MW-86-wait8.test | 1 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/galera_bf_kill.test | 273 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/galera_bf_lock_wait.test | 1 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/galera_query_cache_sync_wait.test | 1 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/galera_var_retry_autocommit.test | 1 | ||||
-rw-r--r-- | sql/sql_class.cc | 5 | ||||
-rw-r--r-- | sql/sql_class.h | 10 | ||||
-rw-r--r-- | sql/sql_parse.cc | 20 | ||||
-rw-r--r-- | sql/sql_plugin_services.ic | 3 | ||||
-rw-r--r-- | sql/wsrep_dummy.cc | 9 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 22 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 378 | ||||
-rw-r--r-- | storage/innobase/include/ha_prototypes.h | 9 | ||||
-rw-r--r-- | storage/innobase/lock/lock0lock.cc | 2 | ||||
-rw-r--r-- | storage/xtradb/handler/ha_innodb.cc | 396 | ||||
-rw-r--r-- | storage/xtradb/include/ha_prototypes.h | 9 | ||||
-rw-r--r-- | storage/xtradb/lock/lock0lock.cc | 2 |
21 files changed, 954 insertions, 319 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h index 499fbf2c173..7b943dedff8 100644 --- a/include/mysql/service_wsrep.h +++ b/include/mysql/service_wsrep.h @@ -99,7 +99,7 @@ extern struct wsrep_service_st { enum wsrep_conflict_state (*wsrep_thd_get_conflict_state_func)(MYSQL_THD); my_bool (*wsrep_thd_is_BF_func)(MYSQL_THD , my_bool); my_bool (*wsrep_thd_is_wsrep_func)(MYSQL_THD thd); - char * (*wsrep_thd_query_func)(THD *thd); + const char * (*wsrep_thd_query_func)(THD *thd); enum wsrep_query_state (*wsrep_thd_query_state_func)(THD *thd); const char * (*wsrep_thd_query_state_str_func)(THD *thd); int (*wsrep_thd_retry_counter_func)(THD *thd); @@ -112,6 +112,7 @@ extern struct wsrep_service_st { int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD); void (*wsrep_unlock_rollback_func)(); void (*wsrep_set_data_home_dir_func)(const char *data_dir); + bool (*wsrep_thd_set_wsrep_killed_func)(MYSQL_THD thd); } *wsrep_service; #ifdef MYSQL_DYNAMIC_PLUGIN @@ -163,6 +164,7 @@ extern struct wsrep_service_st { #define wsrep_drupal_282555_workaround get_wsrep_drupal_282555_workaround() #define wsrep_recovery get_wsrep_recovery() #define wsrep_protocol_version get_wsrep_protocol_version() +#define wsrep_thd_set_wsrep_killed(T) wsrep_service->wsrep_thd_set_wsrep_killed_func(T) #else @@ -176,7 +178,7 @@ extern long wsrep_protocol_version; bool wsrep_consistency_check(THD *thd); bool wsrep_prepare_key(const unsigned char* cache_key, size_t cache_key_len, const unsigned char* row_id, size_t row_id_len, struct wsrep_buf* key, size_t* key_len); -char *wsrep_thd_query(THD *thd); +const char *wsrep_thd_query(THD *thd); const char *wsrep_thd_conflict_state_str(THD *thd); const char *wsrep_thd_exec_mode_str(THD *thd); const char *wsrep_thd_query_state_str(THD *thd); @@ -214,8 +216,10 @@ void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state); bool wsrep_thd_ignore_table(THD *thd); void wsrep_unlock_rollback(); void wsrep_set_data_home_dir(const char *data_dir); +bool wsrep_thd_set_wsrep_killed(MYSQL_THD thd); #endif +/* set wsrep_killed flag for the target THD */ #ifdef __cplusplus } diff --git a/mysql-test/suite/galera/r/galera_bf_kill.result b/mysql-test/suite/galera/r/galera_bf_kill.result new file mode 100644 index 00000000000..fd3a728902c --- /dev/null +++ b/mysql-test/suite/galera/r/galera_bf_kill.result @@ -0,0 +1,120 @@ +connection node_2; +connection node_1; +connection node_2; +CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB; +insert into t1 values (NULL,1); +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a = 5; +connection node_2; +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5; +connection node_2; +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5, b=2; +connection node_2; +ALTER TABLE t1 ADD UNIQUE KEY b1(b); +ALTER TABLE t1 DROP KEY b1; +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +begin; +update t1 set a =5, b=2; +connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2b; +begin; +update t1 set a =6, b=7; +connection node_2; +ALTER TABLE t1 ADD UNIQUE KEY b2(b); +ALTER TABLE t1 DROP KEY b2; +select * from t1; +a b +2 1 +disconnect node_2a; +disconnect node_2b; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +SET SESSION wsrep_on=OFF; +begin; +update t1 set a =5, b=2; +connection node_2; +ALTER TABLE t1 ADD UNIQUE KEY b3(b); +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +SET SESSION wsrep_on=OFF; +begin; +update t1 set a =5, b=2; +connection node_2; +select * from t1; +a b +2 1 +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +truncate t1; +insert into t1 values (1,0); +begin; +update t1 set b=2 where a=1; +connection node_2; +set session wsrep_sync_wait=0; +connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2b; +SET GLOBAL debug_dbug = "d,sync.before_wsrep_thd_abort"; +connection node_1; +select * from t1; +a b +1 0 +update t1 set b= 1 where a=1; +connection node_2b; +SET SESSION DEBUG_SYNC = "now WAIT_FOR sync.before_wsrep_thd_abort_reached"; +connection node_2; +SET DEBUG_SYNC= 'before_awake_no_mutex SIGNAL awake_reached WAIT_FOR continue_kill'; +connection node_2b; +SET DEBUG_SYNC='now WAIT_FOR awake_reached'; +SET GLOBAL debug_dbug = ""; +SET DEBUG_SYNC = "now SIGNAL signal.before_wsrep_thd_abort"; +SET DEBUG_SYNC = "now SIGNAL continue_kill"; +connection node_2; +connection node_2a; +select * from t1; +connection node_2; +SET DEBUG_SYNC = "RESET"; +drop table t1; +disconnect node_2a; +connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2; +connection node_2a; +CREATE TABLE t1 (i int primary key); +SET DEBUG_SYNC = "before_wsrep_ordered_commit SIGNAL bwoc_reached WAIT_FOR bwoc_continue"; +INSERT INTO t1 VALUES (1); +connection node_2; +SET DEBUG_SYNC = "now WAIT_FOR bwoc_reached"; +SET DEBUG_SYNC = "now SIGNAL bwoc_continue"; +SET DEBUG_SYNC='RESET'; +connection node_2a; +connection node_2; +select * from t1; +i +1 +disconnect node_2a; +connection node_2; +drop table t1; diff --git a/mysql-test/suite/galera/t/MW-388.test b/mysql-test/suite/galera/t/MW-388.test index fafdde092bf..2d3fe5b5d93 100644 --- a/mysql-test/suite/galera/t/MW-388.test +++ b/mysql-test/suite/galera/t/MW-388.test @@ -1,5 +1,7 @@ --source include/galera_cluster.inc +--source include/have_debug.inc --source include/have_debug_sync.inc +--source include/galera_have_debug_sync.inc --connection node_1 CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(255)) Engine=InnoDB; diff --git a/mysql-test/suite/galera/t/MW-86-wait1.test b/mysql-test/suite/galera/t/MW-86-wait1.test index a7476b74e68..c5b78be64e5 100644 --- a/mysql-test/suite/galera/t/MW-86-wait1.test +++ b/mysql-test/suite/galera/t/MW-86-wait1.test @@ -7,6 +7,7 @@ --source include/have_binlog_format_row.inc --source include/have_debug.inc --source include/have_debug_sync.inc +--source include/galera_have_debug_sync.inc --connection node_2 # Make sure no signals have been leftover from previous tests to surprise us. diff --git a/mysql-test/suite/galera/t/MW-86-wait8.test b/mysql-test/suite/galera/t/MW-86-wait8.test index 551b0f67b7c..809f5047c51 100644 --- a/mysql-test/suite/galera/t/MW-86-wait8.test +++ b/mysql-test/suite/galera/t/MW-86-wait8.test @@ -4,6 +4,7 @@ --source include/galera_cluster.inc --source include/have_binlog_format_row.inc --source include/have_debug_sync.inc +--source include/galera_have_debug_sync.inc --connection node_2 # Make sure no signals have been leftover from previous tests to surprise us. diff --git a/mysql-test/suite/galera/t/galera_bf_kill.test b/mysql-test/suite/galera/t/galera_bf_kill.test new file mode 100644 index 00000000000..f09a4ba52db --- /dev/null +++ b/mysql-test/suite/galera/t/galera_bf_kill.test @@ -0,0 +1,273 @@ +--source include/galera_cluster.inc +--source include/have_innodb.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc + +# +# Test case 1: Start a transaction on node_2a and kill it +# from other connection on same node +# + +--connection node_2 +CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB; +insert into t1 values (NULL,1); + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a = 5; + +--connection node_2 + +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1 +--source include/wait_condition.inc + +--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1` + +--disable_query_log +--eval KILL $k_thread +--enable_query_log + +select * from t1; +--disconnect node_2a + +# +# Test case 2: Start a transaction on node_2a and use +# kill query from other connection on same node +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5; + +--connection node_2 +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1 +--source include/wait_condition.inc + +--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1` + +--disable_query_log +--eval KILL QUERY $k_thread +--enable_query_log + +select * from t1; +--disconnect node_2a +# +# Test case 3: Start a transaction on node_2a and start a DDL on other transaction +# that will then abort node_2a transaction +# +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5, b=2; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY b1(b); +ALTER TABLE t1 DROP KEY b1; + +select * from t1; + +--disconnect node_2a + +# +# Test case 4: Start a transaction on node_2a and conflicting transaction on node_2b +# and start a DDL on other transaction that will then abort node_2a and node_2b +# transactions +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +begin; +update t1 set a =5, b=2; + +--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2b +begin; +send update t1 set a =6, b=7; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY b2(b); +ALTER TABLE t1 DROP KEY b2; + +select * from t1; + +--disconnect node_2a +--disconnect node_2b + +# +# Test case 5: Start a transaction on node_2a with wsrep disabled +# and start a DDL on other transaction that will then abort node_2a +# transactions +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +SET SESSION wsrep_on=OFF; +begin; +update t1 set a =5, b=2; + +--connection node_2 +ALTER TABLE t1 ADD UNIQUE KEY b3(b); + +select * from t1; + +--disconnect node_2a + +# +# Test case 6: Start a transaction on node_2a with wsrep disabled +# and kill it from other connection on same node +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +SET SESSION wsrep_on=OFF; +begin; +update t1 set a =5, b=2; + +--connection node_2 +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1 +--source include/wait_condition.inc + +--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1` + +--disable_query_log +--eval KILL $k_thread +--enable_query_log + + +select * from t1; + +--disconnect node_2a + +# +# Test case 7: +# 1. Start a transaction on node_2, +# and leave it pending while holding a row locked +# 2. set sync point pause applier +# 3. send a conflicting write on node_1, it will pause +# at the sync point +# 4. though another connection to node_2, kill the local +# transaction +# + +# +# connection node_2a runs a local transaction, that is victim of BF abort +# and victim of KILL command by connection node_2 +# +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +truncate t1; +insert into t1 values (1,0); + +# start a transaction that will conflict with later applier +begin; +update t1 set b=2 where a=1; + +--connection node_2 +set session wsrep_sync_wait=0; +--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1 +--source include/wait_condition.inc + +--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1` + +# connection node_2b is for controlling debug syn points +# first set a sync point for applier, to pause during BF aborting +# and before THD::awake would be called +# +--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2b +SET GLOBAL debug_dbug = "d,sync.before_wsrep_thd_abort"; + +# +# replicate an update, which will BF abort the victim node_2a +# however, while applier in node 2 is handling the abort, +# it will pause in sync point set by node_2b +# +--connection node_1 +select * from t1; +update t1 set b= 1 where a=1; + +# +# wait until the applying of above update has reached the sync point +# in node 2 +# +--connection node_2b +SET SESSION DEBUG_SYNC = "now WAIT_FOR sync.before_wsrep_thd_abort_reached"; + +--connection node_2 +# +# pause KILL execution before awake +# +SET DEBUG_SYNC= 'before_awake_no_mutex SIGNAL awake_reached WAIT_FOR continue_kill'; +--disable_query_log +--send_eval KILL $k_thread +--enable_query_log + + +--connection node_2b +SET DEBUG_SYNC='now WAIT_FOR awake_reached'; + +# release applier and KILL operator +SET GLOBAL debug_dbug = ""; +SET DEBUG_SYNC = "now SIGNAL signal.before_wsrep_thd_abort"; +SET DEBUG_SYNC = "now SIGNAL continue_kill"; + +--connection node_2 +--reap + +--connection node_2a +--error 0,1213 +select * from t1; + +--connection node_2 +SET DEBUG_SYNC = "RESET"; + +drop table t1; + +--disconnect node_2a +# +# Test case 7: +# run a transaction in node 2, and set a sync point to pause the transaction +# in commit phase. +# Through another connection to node 2, kill the committing transaction by +# KILL QUERY command +# + +--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2 +--connection node_2a +--let $connection_id = `SELECT CONNECTION_ID()` + +CREATE TABLE t1 (i int primary key); + +# Set up sync point +SET DEBUG_SYNC = "before_wsrep_ordered_commit SIGNAL bwoc_reached WAIT_FOR bwoc_continue"; + +# Send insert which will block in the sync point above +--send INSERT INTO t1 VALUES (1) + +--connection node_2 +SET DEBUG_SYNC = "now WAIT_FOR bwoc_reached"; + +--disable_query_log +--disable_result_log +# victim has passed the point of no return, kill is not possible anymore +--eval KILL QUERY $connection_id +--enable_result_log +--enable_query_log + +SET DEBUG_SYNC = "now SIGNAL bwoc_continue"; +SET DEBUG_SYNC='RESET'; +--connection node_2a +--error 0,1213 +--reap + +--connection node_2 +# victim was able to complete the INSERT +select * from t1; + +--disconnect node_2a + +--connection node_2 +drop table t1; + diff --git a/mysql-test/suite/galera/t/galera_bf_lock_wait.test b/mysql-test/suite/galera/t/galera_bf_lock_wait.test index e3a9077a888..7a036ee4781 100644 --- a/mysql-test/suite/galera/t/galera_bf_lock_wait.test +++ b/mysql-test/suite/galera/t/galera_bf_lock_wait.test @@ -1,6 +1,7 @@ --source include/galera_cluster.inc --source include/big_test.inc +--connection node_1 CREATE TABLE t1 ENGINE=InnoDB select 1 as a, 1 as b union select 2, 2; ALTER TABLE t1 add primary key(a); diff --git a/mysql-test/suite/galera/t/galera_query_cache_sync_wait.test b/mysql-test/suite/galera/t/galera_query_cache_sync_wait.test index e13e7f1f748..dbe50a8b93c 100644 --- a/mysql-test/suite/galera/t/galera_query_cache_sync_wait.test +++ b/mysql-test/suite/galera/t/galera_query_cache_sync_wait.test @@ -1,6 +1,7 @@ --source include/galera_cluster.inc --source include/have_debug_sync.inc --source include/have_query_cache.inc +--source include/galera_have_debug_sync.inc CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT) ENGINE=InnoDB; INSERT INTO t1 VALUES (1); diff --git a/mysql-test/suite/galera/t/galera_var_retry_autocommit.test b/mysql-test/suite/galera/t/galera_var_retry_autocommit.test index 142f02546b4..496219711e9 100644 --- a/mysql-test/suite/galera/t/galera_var_retry_autocommit.test +++ b/mysql-test/suite/galera/t/galera_var_retry_autocommit.test @@ -5,6 +5,7 @@ --source include/galera_cluster.inc --source include/have_innodb.inc --source include/have_debug_sync.inc +--source include/galera_have_debug_sync.inc --connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1 diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 6897a26bda1..85435203a34 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -798,7 +798,8 @@ THD::THD(bool is_wsrep_applier) wsrep_po_handle(WSREP_PO_INITIALIZER), wsrep_po_cnt(0), wsrep_apply_format(0), - wsrep_ignore_table(false) + wsrep_ignore_table(false), + wsrep_killed(false) #endif { ulong tmp; @@ -1349,6 +1350,7 @@ void THD::init(void) wsrep_affected_rows = 0; wsrep_replicate_GTID = false; wsrep_skip_wsrep_GTID = false; + wsrep_killed = false; #endif /* WITH_WSREP */ if (variables.sql_log_bin) @@ -1985,7 +1987,6 @@ int THD::killed_errno() DBUG_RETURN(0); // Keep compiler happy } - /* Remember the location of thread info, the structure needed for sql_alloc() and the structure for the net buffer diff --git a/sql/sql_class.h b/sql/sql_class.h index 7ca3896a69d..0fb563572e0 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3025,7 +3025,6 @@ public: void close_active_vio(); #endif void awake(killed_state state_to_set); - /** Disconnect the associated communication endpoint. */ void disconnect(); @@ -3539,6 +3538,14 @@ public: killed_err= 0; mysql_mutex_unlock(&LOCK_thd_kill); } + +#ifdef WITH_WSREP + mysql_mutex_assert_not_owner(&LOCK_thd_data); + mysql_mutex_lock(&LOCK_thd_data); + wsrep_killed= false; + mysql_mutex_unlock(&LOCK_thd_data); +#endif /* WITH_WSREP */ + } inline void reset_kill_query() { @@ -4152,6 +4159,7 @@ public: */ bool wsrep_ignore_table; wsrep_gtid_t wsrep_sync_wait_gtid; + bool wsrep_killed; // protected by LOCK_thd_data ulong wsrep_affected_rows; bool wsrep_replicate_GTID; bool wsrep_skip_wsrep_GTID; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 656da3b6a79..765cca31e81 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -8324,8 +8324,24 @@ kill_one_thread(THD *thd, longlong id, killed_state kill_signal, killed_type typ thd->security_ctx->user_matches(tmp->security_ctx)) && !wsrep_thd_is_BF(tmp, false)) { - tmp->awake(kill_signal); - error=0; +#ifdef WITH_WSREP + DEBUG_SYNC(thd, "before_awake_no_mutex"); + + // Note that find_thread_by_id will lock tmp->LOCK_thd_data + if (wsrep_thd_set_wsrep_killed(tmp)) + { + WSREP_DEBUG("Kill transaction thread %llu skipped due to wsrep_killed", tmp->thread_id); + error= 0; + } + else +#endif /* WITH_WSREP */ + { + WSREP_DEBUG("kill_one_thread %llu, victim: %llu wsrep_killed %d by signal %d", + thd->thread_id, id, tmp->wsrep_killed, kill_signal); + tmp->awake(kill_signal); + WSREP_DEBUG("victim: %llu taken care of", id); + error= 0; + } } else error= (type == KILL_TYPE_QUERY ? ER_KILL_QUERY_DENIED_ERROR : diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic index 6cf2a31f76b..96052668bf9 100644 --- a/sql/sql_plugin_services.ic +++ b/sql/sql_plugin_services.ic @@ -181,7 +181,8 @@ static struct wsrep_service_st wsrep_handler = { wsrep_trx_is_aborting, wsrep_trx_order_before, wsrep_unlock_rollback, - wsrep_set_data_home_dir + wsrep_set_data_home_dir, + wsrep_thd_set_wsrep_killed }; static struct thd_specifics_service_st thd_specifics_handler= diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc index 364ef2d3e7a..0041eec084c 100644 --- a/sql/wsrep_dummy.cc +++ b/sql/wsrep_dummy.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2014 SkySQL Ab. +/* Copyright (C) 2014, 2020, MariaDB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -101,8 +101,8 @@ enum wsrep_conflict_state wsrep_thd_get_conflict_state(THD *) my_bool wsrep_thd_is_wsrep(THD *) { return 0; } -char *wsrep_thd_query(THD *) -{ return 0; } +const char *wsrep_thd_query(THD *) +{ return "NULL"; } enum wsrep_query_state wsrep_thd_query_state(THD *) { return QUERY_IDLE; } @@ -145,3 +145,6 @@ void wsrep_set_data_home_dir(const char *) void wsrep_log(void (*)(const char *, ...), const char *, ...) { } + +bool wsrep_thd_set_wsrep_killed(THD*) +{ return false;} diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 38f8ca413db..af6b3deca78 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2601,12 +2601,14 @@ wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd) void wsrep_thd_LOCK(THD *thd) { + mysql_mutex_assert_not_owner(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_data); } void wsrep_thd_UNLOCK(THD *thd) { + mysql_mutex_assert_owner(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data); } @@ -2634,9 +2636,9 @@ extern "C" query_id_t wsrep_thd_query_id(THD *thd) } -char *wsrep_thd_query(THD *thd) +const char *wsrep_thd_query(THD *thd) { - return (thd) ? thd->query() : NULL; + return (thd) ? thd->query() : "NULL"; } @@ -2654,11 +2656,10 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) { + mysql_mutex_assert_owner(&thd->LOCK_thd_data); if (signal) { - mysql_mutex_lock(&thd->LOCK_thd_data); thd->awake(KILL_QUERY); - mysql_mutex_unlock(&thd->LOCK_thd_data); } else { @@ -2666,6 +2667,8 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) mysql_cond_broadcast(&COND_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying); } + + mysql_mutex_unlock(&thd->LOCK_thd_data); } @@ -2930,3 +2933,14 @@ bool wsrep_node_is_synced() { return (WSREP_ON) ? (wsrep_config_state.get_status() == 4) : false; } + +bool wsrep_thd_set_wsrep_killed(THD *thd) +{ + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + if (thd->wsrep_killed) + { + return true; + } + thd->wsrep_killed= true; + return false; +} diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 2dd20927545..5a870853cf6 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -18542,235 +18542,323 @@ static struct st_mysql_storage_engine innobase_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; #ifdef WITH_WSREP -void -wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno) -{ - WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " - "caused by:\n\t" - "1) unsupported configuration options combination, please check documentation.\n\t" - "2) a bug in the code.\n\t" - "3) a database corruption.\n Node consistency compromized, " - "need to abort. Restart the node to resync with cluster.", - (long long)bf_seqno, (long long)victim_seqno); - abort(); -} -/*******************************************************************//** -This function is used to kill one transaction in BF. */ +static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno) +{ + WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " + "caused by:\n\t" + "1) unsupported configuration options combination, please check documentation.\n\t" + "2) a bug in the code.\n\t" + "3) a database corruption.\n Node consistency compromized, " + "need to abort. Restart the node to resync with cluster.", + bf_seqno, victim_seqno); + abort(); +} + +/** This function is used to kill one transaction. + +This transaction was open on this node (not-yet-committed), and a +conflicting writeset from some other node that was being applied +caused a locking conflict. First committed (from other node) +wins, thus open transaction is rolled back. BF stands for +brute-force: any transaction can get aborted by galera any time +it is necessary. + +This conflict can happen only when the replicated writeset (from +other node) is being applied, not when it’s waiting in the queue. +If our local transaction reached its COMMIT and this conflicting +writeset was in the queue, then it should fail the local +certification test instead. + +A brute force abort is only triggered by a locking conflict +between a writeset being applied by an applier thread (slave thread) +and an open transaction on the node, not by a Galera writeset +comparison as in the local certification failure. + +@param[in] bf_thd Brute force (BF) thread +@param[in,out] victim_trx Vimtim trx to be killed +@param[in] signal Should victim be signaled */ UNIV_INTERN int wsrep_innobase_kill_one_trx( - void * const bf_thd_ptr, - const trx_t * const bf_trx, + THD* bf_thd, trx_t *victim_trx, - ibool signal) + bool signal) { - ut_ad(lock_mutex_own()); - ut_ad(trx_mutex_own(victim_trx)); - ut_ad(bf_thd_ptr); - ut_ad(victim_trx); + ut_ad(bf_thd); + ut_ad(victim_trx); + ut_ad(lock_mutex_own()); + ut_ad(trx_mutex_own(victim_trx)); DBUG_ENTER("wsrep_innobase_kill_one_trx"); - THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL; - THD *thd = (THD *) victim_trx->mysql_thd; - int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0; - if (!thd) { - DBUG_PRINT("wsrep", ("no thd for conflicting lock")); - WSREP_WARN("no THD for trx: " TRX_ID_FMT, victim_trx->id); - DBUG_RETURN(1); - } + THD *thd= (THD *) victim_trx->mysql_thd; + ut_ad(thd); + /* Note that bf_trx might not exist here e.g. on MDL conflict + case (test: galera_concurrent_ctas). Similarly, BF thread + could be also acquiring MDL-lock causing victim to be + aborted. However, we have not yet called innobase_trx_init() + for BF transaction (test: galera_many_columns) */ + trx_t* bf_trx= thd_to_trx(bf_thd); - if (!bf_thd) { - DBUG_PRINT("wsrep", ("no BF thd for conflicting lock")); - WSREP_WARN("no BF THD for trx: " TRX_ID_FMT, - bf_trx ? bf_trx->id : 0); - DBUG_RETURN(1); - } + DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort", + { + const char act[]= + "now " + "SIGNAL sync.before_wsrep_thd_abort_reached " + "WAIT_FOR signal.before_wsrep_thd_abort"; + DBUG_ASSERT(!debug_sync_set_action(bf_thd, + STRING_WITH_LEN(act))); + };); - WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); + wsrep_thd_LOCK(thd); - WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: " - TRX_ID_FMT, - signal, (long long)bf_seqno, - thd_get_thread_id(thd), - victim_trx->id); + if (wsrep_thd_set_wsrep_killed(thd)) + { + WSREP_DEBUG("innodb kill transaction skipped due to wsrep_killed"); + wsrep_thd_UNLOCK(thd); + DBUG_RETURN(0); + } - WSREP_DEBUG("Aborting query: %s conf %d trx: %" PRId64, - (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void", - wsrep_thd_conflict_state(thd, FALSE), - wsrep_thd_ws_handle(thd)->trx_id); - - wsrep_thd_LOCK(thd); - DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock", - { - const char act[]= - "now " - "wait_for signal.wsrep_after_BF_victim_lock"; - DBUG_ASSERT(!debug_sync_set_action(bf_thd, - STRING_WITH_LEN(act))); - };); + unsigned long long victim_trx_id= static_cast<unsigned long long>(victim_trx->id); + unsigned long long bf_trx_id= bf_trx ? + static_cast<unsigned long long>(bf_trx->id) : + TRX_ID_MAX; + unsigned long victim_thread= thd_get_thread_id(thd); + long long victim_seqno= wsrep_thd_trx_seqno(thd); + unsigned long bf_thread= thd_get_thread_id(bf_thd); + long long bf_seqno= wsrep_thd_trx_seqno(bf_thd); + WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); - if (wsrep_thd_query_state(thd) == QUERY_EXITING) { - WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT, - victim_trx->id); + WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s exec %s query: %s", + wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal", + bf_trx_id, + bf_thread, + bf_seqno, + wsrep_thd_query_state_str(bf_thd), + wsrep_thd_conflict_state_str(bf_thd), + wsrep_thd_exec_mode_str(bf_thd), + wsrep_thd_query(bf_thd)); + + WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s exec %s query: %s", + wsrep_thd_is_BF(thd, false) ? "BF" : "normal", + victim_trx_id, + victim_thread, + victim_seqno, + wsrep_thd_query_state_str(thd), + wsrep_thd_conflict_state_str(thd), + wsrep_thd_exec_mode_str(thd), + wsrep_thd_query(thd)); + + if (wsrep_thd_query_state(thd) == QUERY_EXITING) + { + WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu" + " thread: %lu", + victim_trx_id, + victim_thread); wsrep_thd_UNLOCK(thd); DBUG_RETURN(0); } - if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) { - WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d", - victim_trx->id, - wsrep_thd_get_conflict_state(thd)); + if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) + { + WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu " + ", thread: %lu exec_mode: %s", + victim_trx_id, + victim_thread, + wsrep_thd_exec_mode_str(thd)); } - switch (wsrep_thd_get_conflict_state(thd)) { + switch (wsrep_thd_get_conflict_state(thd)) + { case NO_CONFLICT: + { + WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state", + victim_thread, + victim_trx_id); wsrep_thd_set_conflict_state(thd, MUST_ABORT); break; - case MUST_ABORT: - WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state", - victim_trx->id); - wsrep_thd_UNLOCK(thd); + } + case MUST_ABORT: + { + WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state", + victim_thread, + victim_trx_id); wsrep_thd_awake(thd, signal); DBUG_RETURN(0); break; + } case ABORTED: case ABORTING: // fall through default: - WSREP_DEBUG("victim " TRX_ID_FMT " in state %d", - victim_trx->id, wsrep_thd_get_conflict_state(thd)); + { + WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_conflict_state_str(thd)); wsrep_thd_UNLOCK(thd); DBUG_RETURN(0); break; } + } - switch (wsrep_thd_query_state(thd)) { + switch (wsrep_thd_query_state(thd)) + { case QUERY_COMMITTING: - enum wsrep_status rcode; + { + enum wsrep_status rcode=WSREP_OK; - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT, - victim_trx->id); + WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); } else { wsrep_t *wsrep= get_wsrep(); - rcode = wsrep->abort_pre_commit( - wsrep, bf_seqno, - (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id - ); - switch (rcode) { + rcode= wsrep->abort_pre_commit(wsrep, bf_seqno, + (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id); + + switch (rcode) + { case WSREP_WARNING: - WSREP_DEBUG("cancel commit warning: " - TRX_ID_FMT, - victim_trx->id); - wsrep_thd_UNLOCK(thd); + { + WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + wsrep_thd_awake(thd, signal); DBUG_RETURN(1); break; + } case WSREP_OK: break; default: - WSREP_ERROR( - "cancel commit bad exit: %d " - TRX_ID_FMT, - rcode, - victim_trx->id); + { + WSREP_ERROR("Victim cancel commit bad commit exit thread: " + "%lu trx: %llu rcode: %d ", + victim_thread, + victim_trx_id, + rcode); /* unable to interrupt, must abort */ /* note: kill_mysql() will block, if we cannot. - * kill the lock holder first. - */ + * kill the lock holder first. */ abort(); break; } + } } - wsrep_thd_UNLOCK(thd); + wsrep_thd_awake(thd, signal); break; + } case QUERY_EXEC: + { /* it is possible that victim trx is itself waiting for some - * other lock. We need to cancel this waiting - */ - WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT, - victim_trx->id); - - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - if (victim_trx->lock.wait_lock) { - WSREP_DEBUG("victim has wait flag: %ld", - thd_get_thread_id(thd)); - lock_t* wait_lock = victim_trx->lock.wait_lock; - if (wait_lock) { - WSREP_DEBUG("canceling wait lock"); - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - lock_cancel_waiting_and_release(wait_lock); - } + * other lock. We need to cancel this waiting */ + WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu", + victim_thread, victim_trx_id); - wsrep_thd_UNLOCK(thd); + lock_t* wait_lock = victim_trx->lock.wait_lock; + + if (wait_lock) + { + WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag", + victim_thread, + victim_trx_id); + + victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; + lock_cancel_waiting_and_release(wait_lock); wsrep_thd_awake(thd, signal); } else { - /* abort currently executing query */ - DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - /* Note that innobase_kill_query will take lock_mutex - and trx_mutex */ - wsrep_thd_UNLOCK(thd); + /* Abort currently executing query */ + WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + wsrep_thd_awake(thd, signal); /* for BF thd, we need to prevent him from committing */ - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave for thread: " + "%lu trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); } } break; + } case QUERY_IDLE: { - WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id); + WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: " + "%llu bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - WSREP_DEBUG("kill BF IDLE, seqno: %lld", - (long long)wsrep_thd_trx_seqno(thd)); wsrep_thd_UNLOCK(thd); - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); + wsrep_abort_slave_trx(bf_seqno, victim_seqno); DBUG_RETURN(0); } - /* This will lock thd from proceeding after net_read() */ + + /* This will lock thd from proceeding after net_read() */ wsrep_thd_set_conflict_state(thd, ABORTING); wsrep_lock_rollback(); - if (wsrep_aborting_thd_contains(thd)) { - WSREP_WARN("duplicate thd aborter %lu", - thd_get_thread_id(thd)); + if (wsrep_aborting_thd_contains(thd)) + { + WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu", + victim_thread, + victim_trx_id); } else { wsrep_aborting_thd_enqueue(thd); - DBUG_PRINT("wsrep",("enqueuing trx abort for %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("enqueuing trx abort for (%lu)", - thd_get_thread_id(thd)); + WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort", + victim_thread, + victim_trx_id); } - DBUG_PRINT("wsrep",("signalling wsrep rollbacker")); - WSREP_DEBUG("signaling aborter"); wsrep_unlock_rollback(); wsrep_thd_UNLOCK(thd); break; } default: - WSREP_WARN("bad wsrep query state: %d", - wsrep_thd_query_state(thd)); + { + WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_query_state_str(thd)); + wsrep_thd_UNLOCK(thd); + ut_error; break; } + } DBUG_RETURN(0); } @@ -18784,9 +18872,10 @@ wsrep_abort_transaction( my_bool signal) { DBUG_ENTER("wsrep_innobase_abort_thd"); - + ut_ad(bf_thd); + ut_ad(victim_thd); + trx_t* victim_trx = thd_to_trx(victim_thd); - trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL; WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d", wsrep_thd_query(bf_thd), @@ -18797,18 +18886,17 @@ wsrep_abort_transaction( lock_mutex_enter(); trx_mutex_enter(victim_trx); victim_trx->abort_type = TRX_WSREP_ABORT; - int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx, + int rcode = wsrep_innobase_kill_one_trx(bf_thd, victim_trx, signal); + victim_trx->abort_type = TRX_SERVER_ABORT; trx_mutex_exit(victim_trx); lock_mutex_exit(); - victim_trx->abort_type = TRX_SERVER_ABORT; wsrep_srv_conc_cancel_wait(victim_trx); DBUG_RETURN(rcode); } else { WSREP_DEBUG("victim does not have transaction"); wsrep_thd_LOCK(victim_thd); wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT); - wsrep_thd_UNLOCK(victim_thd); wsrep_thd_awake(victim_thd, signal); } diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h index bcf8a893695..84cde3c93d2 100644 --- a/storage/innobase/include/ha_prototypes.h +++ b/storage/innobase/include/ha_prototypes.h @@ -284,10 +284,11 @@ innobase_casedn_str( #ifdef WITH_WSREP UNIV_INTERN int -wsrep_innobase_kill_one_trx(void * const thd_ptr, - const trx_t * const bf_trx, - trx_t *victim_trx, - ibool signal); +wsrep_innobase_kill_one_trx( + THD* bf_thd, + trx_t *victim_trx, + bool signal); + int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, unsigned char* str, unsigned int str_length, unsigned int buf_length); diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 0c4e40067d1..3873d557afc 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -1809,7 +1809,7 @@ wsrep_kill_victim( lock->trx->abort_type = TRX_WSREP_ABORT; wsrep_innobase_kill_one_trx(trx->mysql_thd, - (const trx_t*) trx, lock->trx, TRUE); + lock->trx, true); lock->trx->abort_type = TRX_SERVER_ABORT; } } diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc index 0a77ce56eaf..bc6e44b7988 100644 --- a/storage/xtradb/handler/ha_innodb.cc +++ b/storage/xtradb/handler/ha_innodb.cc @@ -19519,264 +19519,362 @@ static struct st_mysql_storage_engine innobase_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; #ifdef WITH_WSREP -void -wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno) -{ - WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " - "caused by:\n\t" - "1) unsupported configuration options combination, please check documentation.\n\t" - "2) a bug in the code.\n\t" - "3) a database corruption.\n Node consistency compromized, " - "need to abort. Restart the node to resync with cluster.", - (long long)bf_seqno, (long long)victim_seqno); - abort(); -} -/*******************************************************************//** -This function is used to kill one transaction in BF. */ + +static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno) +{ + WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be " + "caused by:\n\t" + "1) unsupported configuration options combination, please check documentation.\n\t" + "2) a bug in the code.\n\t" + "3) a database corruption.\n Node consistency compromized, " + "need to abort. Restart the node to resync with cluster.", + bf_seqno, victim_seqno); + abort(); +} + +/** This function is used to kill one transaction. + +This transaction was open on this node (not-yet-committed), and a +conflicting writeset from some other node that was being applied +caused a locking conflict. First committed (from other node) +wins, thus open transaction is rolled back. BF stands for +brute-force: any transaction can get aborted by galera any time +it is necessary. + +This conflict can happen only when the replicated writeset (from +other node) is being applied, not when it’s waiting in the queue. +If our local transaction reached its COMMIT and this conflicting +writeset was in the queue, then it should fail the local +certification test instead. + +A brute force abort is only triggered by a locking conflict +between a writeset being applied by an applier thread (slave thread) +and an open transaction on the node, not by a Galera writeset +comparison as in the local certification failure. + +@param[in] bf_thd Brute force (BF) thread +@param[in,out] victim_trx Vimtim trx to be killed +@param[in] signal Should victim be signaled */ UNIV_INTERN int wsrep_innobase_kill_one_trx( - void * const bf_thd_ptr, - const trx_t * const bf_trx, + THD* bf_thd, trx_t *victim_trx, - ibool signal) + bool signal) { - ut_ad(lock_mutex_own()); - ut_ad(trx_mutex_own(victim_trx)); - ut_ad(bf_thd_ptr); - ut_ad(victim_trx); + ut_ad(bf_thd); + ut_ad(victim_trx); + ut_ad(lock_mutex_own()); + ut_ad(trx_mutex_own(victim_trx)); DBUG_ENTER("wsrep_innobase_kill_one_trx"); - THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL; - THD *thd = (THD *) victim_trx->mysql_thd; - int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0; - if (!thd) { - DBUG_PRINT("wsrep", ("no thd for conflicting lock")); - WSREP_WARN("no THD for trx: " TRX_ID_FMT, victim_trx->id); - DBUG_RETURN(1); - } + THD *thd= (THD *) victim_trx->mysql_thd; + ut_ad(thd); + /* Note that bf_trx might not exist here e.g. on MDL conflict + case (test: galera_concurrent_ctas). Similarly, BF thread + could be also acquiring MDL-lock causing victim to be + aborted. However, we have not yet called innobase_trx_init() + for BF transaction (test: galera_many_columns) */ + trx_t* bf_trx= thd_to_trx(bf_thd); - if (!bf_thd) { - DBUG_PRINT("wsrep", ("no BF thd for conflicting lock")); - WSREP_WARN("no BF THD for trx: " TRX_ID_FMT, - bf_trx ? bf_trx->id : 0); - DBUG_RETURN(1); - } + DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort", + { + const char act[]= + "now " + "SIGNAL sync.before_wsrep_thd_abort_reached " + "WAIT_FOR signal.before_wsrep_thd_abort"; + DBUG_ASSERT(!debug_sync_set_action(bf_thd, + STRING_WITH_LEN(act))); + };); - WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); + wsrep_thd_LOCK(thd); - WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: " - TRX_ID_FMT, - signal, (long long)bf_seqno, - thd_get_thread_id(thd), - victim_trx->id); + if (wsrep_thd_set_wsrep_killed(thd)) + { + WSREP_DEBUG("innodb kill transaction skipped due to wsrep_killed"); + wsrep_thd_UNLOCK(thd); + DBUG_RETURN(0); + } - WSREP_DEBUG("Aborting query: %s", - (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void"); - - wsrep_thd_LOCK(thd); - DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock", - { - const char act[]= - "now " - "wait_for signal.wsrep_after_BF_victim_lock"; - DBUG_ASSERT(!debug_sync_set_action(bf_thd, - STRING_WITH_LEN(act))); - };); + unsigned long long victim_trx_id= static_cast<unsigned long long>(victim_trx->id); + unsigned long long bf_trx_id= bf_trx ? + static_cast<unsigned long long>(bf_trx->id) : + TRX_ID_MAX; + unsigned long victim_thread= thd_get_thread_id(thd); + long long victim_seqno= wsrep_thd_trx_seqno(thd); + unsigned long bf_thread= thd_get_thread_id(bf_thd); + long long bf_seqno= wsrep_thd_trx_seqno(bf_thd); + WSREP_LOG_CONFLICT(bf_thd, thd, TRUE); - if (wsrep_thd_query_state(thd) == QUERY_EXITING) { - WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT, - victim_trx->id); + WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s exec %s query: %s", + wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal", + bf_trx_id, + bf_thread, + bf_seqno, + wsrep_thd_query_state_str(bf_thd), + wsrep_thd_conflict_state_str(bf_thd), + wsrep_thd_exec_mode_str(bf_thd), + wsrep_thd_query(bf_thd)); + + WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld " + "seqno: %lld query_state: %s conflict_state: %s exec %s query: %s", + wsrep_thd_is_BF(thd, false) ? "BF" : "normal", + victim_trx_id, + victim_thread, + victim_seqno, + wsrep_thd_query_state_str(thd), + wsrep_thd_conflict_state_str(thd), + wsrep_thd_exec_mode_str(thd), + wsrep_thd_query(thd)); + + if (wsrep_thd_query_state(thd) == QUERY_EXITING) + { + WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu" + " thread: %lu", + victim_trx_id, + victim_thread); wsrep_thd_UNLOCK(thd); DBUG_RETURN(0); } - if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) { - WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d", - victim_trx->id, - wsrep_thd_get_conflict_state(thd)); + if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) + { + WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu " + ", thread: %lu exec_mode: %s", + victim_trx_id, + victim_thread, + wsrep_thd_exec_mode_str(thd)); } - switch (wsrep_thd_get_conflict_state(thd)) { + switch (wsrep_thd_get_conflict_state(thd)) + { case NO_CONFLICT: + { + WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state", + victim_thread, + victim_trx_id); wsrep_thd_set_conflict_state(thd, MUST_ABORT); break; - case MUST_ABORT: - WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state", - victim_trx->id); - wsrep_thd_UNLOCK(thd); + } + case MUST_ABORT: + { + WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state", + victim_thread, + victim_trx_id); wsrep_thd_awake(thd, signal); DBUG_RETURN(0); break; + } case ABORTED: case ABORTING: // fall through default: - WSREP_DEBUG("victim " TRX_ID_FMT " in state %d", - victim_trx->id, wsrep_thd_get_conflict_state(thd)); + { + WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_conflict_state_str(thd)); wsrep_thd_UNLOCK(thd); DBUG_RETURN(0); break; } + } - switch (wsrep_thd_query_state(thd)) { + switch (wsrep_thd_query_state(thd)) + { case QUERY_COMMITTING: - enum wsrep_status rcode; + { + enum wsrep_status rcode=WSREP_OK; - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT, - victim_trx->id); + WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); } else { wsrep_t *wsrep= get_wsrep(); - rcode = wsrep->abort_pre_commit( - wsrep, bf_seqno, - (wsrep_trx_id_t)victim_trx->id - ); - switch (rcode) { + rcode= wsrep->abort_pre_commit(wsrep, bf_seqno, + (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id); + + switch (rcode) + { case WSREP_WARNING: - WSREP_DEBUG("cancel commit warning: " - TRX_ID_FMT, - victim_trx->id); - wsrep_thd_UNLOCK(thd); + { + WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + wsrep_thd_awake(thd, signal); DBUG_RETURN(1); break; + } case WSREP_OK: break; default: - WSREP_ERROR( - "cancel commit bad exit: %d " - TRX_ID_FMT, - rcode, - victim_trx->id); + { + WSREP_ERROR("Victim cancel commit bad commit exit thread: " + "%lu trx: %llu rcode: %d ", + victim_thread, + victim_trx_id, + rcode); /* unable to interrupt, must abort */ /* note: kill_mysql() will block, if we cannot. - * kill the lock holder first. - */ + * kill the lock holder first. */ abort(); break; } + } } - wsrep_thd_UNLOCK(thd); + wsrep_thd_awake(thd, signal); break; + } case QUERY_EXEC: + { /* it is possible that victim trx is itself waiting for some - * other lock. We need to cancel this waiting - */ - WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT, - victim_trx->id); - - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - if (victim_trx->lock.wait_lock) { - WSREP_DEBUG("victim has wait flag: %ld", - thd_get_thread_id(thd)); - lock_t* wait_lock = victim_trx->lock.wait_lock; - if (wait_lock) { - WSREP_DEBUG("canceling wait lock"); - victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; - lock_cancel_waiting_and_release(wait_lock); - } + * other lock. We need to cancel this waiting */ + WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu", + victim_thread, victim_trx_id); - wsrep_thd_UNLOCK(thd); + lock_t* wait_lock = victim_trx->lock.wait_lock; + + if (wait_lock) + { + WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag", + victim_thread, + victim_trx_id); + + victim_trx->lock.was_chosen_as_deadlock_victim= TRUE; + lock_cancel_waiting_and_release(wait_lock); wsrep_thd_awake(thd, signal); } else { - /* abort currently executing query */ - DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("kill query for: %ld", - thd_get_thread_id(thd)); - /* Note that innobase_kill_query will take lock_mutex - and trx_mutex */ - wsrep_thd_UNLOCK(thd); + /* Abort currently executing query */ + WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu", + victim_thread, + victim_trx_id); + wsrep_thd_awake(thd, signal); /* for BF thd, we need to prevent him from committing */ - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV abort slave for thread: " + "%lu trx: %llu" + " bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); + + wsrep_abort_slave_trx(bf_seqno, victim_seqno); } } break; + } case QUERY_IDLE: { - WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id); + WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu", + victim_thread, + victim_trx_id); + + if (wsrep_thd_exec_mode(thd) == REPL_RECV) + { + WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: " + "%llu bf_seqno: %lld victim_seqno: %lld", + victim_thread, + victim_trx_id, + bf_seqno, + victim_seqno); - if (wsrep_thd_exec_mode(thd) == REPL_RECV) { - WSREP_DEBUG("kill BF IDLE, seqno: %lld", - (long long)wsrep_thd_trx_seqno(thd)); wsrep_thd_UNLOCK(thd); - wsrep_abort_slave_trx(bf_seqno, - wsrep_thd_trx_seqno(thd)); + wsrep_abort_slave_trx(bf_seqno, victim_seqno); DBUG_RETURN(0); } - /* This will lock thd from proceeding after net_read() */ + + /* This will lock thd from proceeding after net_read() */ wsrep_thd_set_conflict_state(thd, ABORTING); wsrep_lock_rollback(); - if (wsrep_aborting_thd_contains(thd)) { - WSREP_WARN("duplicate thd aborter %lu", - thd_get_thread_id(thd)); + if (wsrep_aborting_thd_contains(thd)) + { + WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu", + victim_thread, + victim_trx_id); } else { wsrep_aborting_thd_enqueue(thd); - DBUG_PRINT("wsrep",("enqueuing trx abort for %lu", - thd_get_thread_id(thd))); - WSREP_DEBUG("enqueuing trx abort for (%lu)", - thd_get_thread_id(thd)); + WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort", + victim_thread, + victim_trx_id); } - DBUG_PRINT("wsrep",("signalling wsrep rollbacker")); - WSREP_DEBUG("signaling aborter"); wsrep_unlock_rollback(); wsrep_thd_UNLOCK(thd); break; } default: - WSREP_WARN("bad wsrep query state: %d", - wsrep_thd_query_state(thd)); + { + WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s", + victim_thread, + victim_trx_id, + wsrep_thd_query_state_str(thd)); + wsrep_thd_UNLOCK(thd); + ut_error; break; } + } DBUG_RETURN(0); } -static int -wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd, - my_bool signal) +static +int +wsrep_abort_transaction( + handlerton* hton, + THD *bf_thd, + THD *victim_thd, + my_bool signal) { DBUG_ENTER("wsrep_innobase_abort_thd"); - trx_t* victim_trx = thd_to_trx(victim_thd); - trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL; - WSREP_DEBUG("abort transaction: BF: %s victim: %s", - wsrep_thd_query(bf_thd), - wsrep_thd_query(victim_thd)); + ut_ad(bf_thd); + ut_ad(victim_thd); + + trx_t* victim_trx = thd_to_trx(victim_thd); + + WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d", + wsrep_thd_query(bf_thd), + wsrep_thd_query(victim_thd), + wsrep_thd_conflict_state(victim_thd, FALSE)); if (victim_trx) { lock_mutex_enter(); trx_mutex_enter(victim_trx); victim_trx->abort_type = TRX_WSREP_ABORT; - int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx, + int rcode = wsrep_innobase_kill_one_trx(bf_thd, victim_trx, signal); + victim_trx->abort_type = TRX_SERVER_ABORT; trx_mutex_exit(victim_trx); lock_mutex_exit(); - victim_trx->abort_type = TRX_SERVER_ABORT; wsrep_srv_conc_cancel_wait(victim_trx); DBUG_RETURN(rcode); } else { WSREP_DEBUG("victim does not have transaction"); wsrep_thd_LOCK(victim_thd); wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT); - wsrep_thd_UNLOCK(victim_thd); wsrep_thd_awake(victim_thd, signal); } diff --git a/storage/xtradb/include/ha_prototypes.h b/storage/xtradb/include/ha_prototypes.h index e5b545e0727..266b5250a8a 100644 --- a/storage/xtradb/include/ha_prototypes.h +++ b/storage/xtradb/include/ha_prototypes.h @@ -303,10 +303,11 @@ innobase_casedn_str( #ifdef WITH_WSREP UNIV_INTERN int -wsrep_innobase_kill_one_trx(void * const thd_ptr, - const trx_t * const bf_trx, - trx_t *victim_trx, - ibool signal); +wsrep_innobase_kill_one_trx( + THD* bf_thd, + trx_t *victim_trx, + bool signal); + int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, unsigned char* str, unsigned int str_length, unsigned int buf_length); diff --git a/storage/xtradb/lock/lock0lock.cc b/storage/xtradb/lock/lock0lock.cc index 4d40111ac20..844d81886da 100644 --- a/storage/xtradb/lock/lock0lock.cc +++ b/storage/xtradb/lock/lock0lock.cc @@ -1820,7 +1820,7 @@ wsrep_kill_victim( lock->trx->abort_type = TRX_WSREP_ABORT; wsrep_innobase_kill_one_trx(trx->mysql_thd, - (const trx_t*) trx, lock->trx, TRUE); + lock->trx, true); lock->trx->abort_type = TRX_SERVER_ABORT; } } |