summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/mysql/service_wsrep.h8
-rw-r--r--mysql-test/suite/galera/r/galera_bf_kill.result120
-rw-r--r--mysql-test/suite/galera/t/MW-388.test2
-rw-r--r--mysql-test/suite/galera/t/MW-86-wait1.test1
-rw-r--r--mysql-test/suite/galera/t/MW-86-wait8.test1
-rw-r--r--mysql-test/suite/galera/t/galera_bf_kill.test273
-rw-r--r--mysql-test/suite/galera/t/galera_bf_lock_wait.test1
-rw-r--r--mysql-test/suite/galera/t/galera_query_cache_sync_wait.test1
-rw-r--r--mysql-test/suite/galera/t/galera_var_retry_autocommit.test1
-rw-r--r--sql/sql_class.cc5
-rw-r--r--sql/sql_class.h10
-rw-r--r--sql/sql_parse.cc20
-rw-r--r--sql/sql_plugin_services.ic3
-rw-r--r--sql/wsrep_dummy.cc9
-rw-r--r--sql/wsrep_mysqld.cc22
-rw-r--r--storage/innobase/handler/ha_innodb.cc378
-rw-r--r--storage/innobase/include/ha_prototypes.h9
-rw-r--r--storage/innobase/lock/lock0lock.cc2
-rw-r--r--storage/xtradb/handler/ha_innodb.cc396
-rw-r--r--storage/xtradb/include/ha_prototypes.h9
-rw-r--r--storage/xtradb/lock/lock0lock.cc2
21 files changed, 954 insertions, 319 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h
index 499fbf2c173..7b943dedff8 100644
--- a/include/mysql/service_wsrep.h
+++ b/include/mysql/service_wsrep.h
@@ -99,7 +99,7 @@ extern struct wsrep_service_st {
enum wsrep_conflict_state (*wsrep_thd_get_conflict_state_func)(MYSQL_THD);
my_bool (*wsrep_thd_is_BF_func)(MYSQL_THD , my_bool);
my_bool (*wsrep_thd_is_wsrep_func)(MYSQL_THD thd);
- char * (*wsrep_thd_query_func)(THD *thd);
+ const char * (*wsrep_thd_query_func)(THD *thd);
enum wsrep_query_state (*wsrep_thd_query_state_func)(THD *thd);
const char * (*wsrep_thd_query_state_str_func)(THD *thd);
int (*wsrep_thd_retry_counter_func)(THD *thd);
@@ -112,6 +112,7 @@ extern struct wsrep_service_st {
int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD);
void (*wsrep_unlock_rollback_func)();
void (*wsrep_set_data_home_dir_func)(const char *data_dir);
+ bool (*wsrep_thd_set_wsrep_killed_func)(MYSQL_THD thd);
} *wsrep_service;
#ifdef MYSQL_DYNAMIC_PLUGIN
@@ -163,6 +164,7 @@ extern struct wsrep_service_st {
#define wsrep_drupal_282555_workaround get_wsrep_drupal_282555_workaround()
#define wsrep_recovery get_wsrep_recovery()
#define wsrep_protocol_version get_wsrep_protocol_version()
+#define wsrep_thd_set_wsrep_killed(T) wsrep_service->wsrep_thd_set_wsrep_killed_func(T)
#else
@@ -176,7 +178,7 @@ extern long wsrep_protocol_version;
bool wsrep_consistency_check(THD *thd);
bool wsrep_prepare_key(const unsigned char* cache_key, size_t cache_key_len, const unsigned char* row_id, size_t row_id_len, struct wsrep_buf* key, size_t* key_len);
-char *wsrep_thd_query(THD *thd);
+const char *wsrep_thd_query(THD *thd);
const char *wsrep_thd_conflict_state_str(THD *thd);
const char *wsrep_thd_exec_mode_str(THD *thd);
const char *wsrep_thd_query_state_str(THD *thd);
@@ -214,8 +216,10 @@ void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state);
bool wsrep_thd_ignore_table(THD *thd);
void wsrep_unlock_rollback();
void wsrep_set_data_home_dir(const char *data_dir);
+bool wsrep_thd_set_wsrep_killed(MYSQL_THD thd);
#endif
+/* set wsrep_killed flag for the target THD */
#ifdef __cplusplus
}
diff --git a/mysql-test/suite/galera/r/galera_bf_kill.result b/mysql-test/suite/galera/r/galera_bf_kill.result
new file mode 100644
index 00000000000..fd3a728902c
--- /dev/null
+++ b/mysql-test/suite/galera/r/galera_bf_kill.result
@@ -0,0 +1,120 @@
+connection node_2;
+connection node_1;
+connection node_2;
+CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB;
+insert into t1 values (NULL,1);
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a = 5;
+connection node_2;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5;
+connection node_2;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5, b=2;
+connection node_2;
+ALTER TABLE t1 ADD UNIQUE KEY b1(b);
+ALTER TABLE t1 DROP KEY b1;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5, b=2;
+connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2b;
+begin;
+update t1 set a =6, b=7;
+connection node_2;
+ALTER TABLE t1 ADD UNIQUE KEY b2(b);
+ALTER TABLE t1 DROP KEY b2;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+disconnect node_2b;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+SET SESSION wsrep_on=OFF;
+begin;
+update t1 set a =5, b=2;
+connection node_2;
+ALTER TABLE t1 ADD UNIQUE KEY b3(b);
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+SET SESSION wsrep_on=OFF;
+begin;
+update t1 set a =5, b=2;
+connection node_2;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+truncate t1;
+insert into t1 values (1,0);
+begin;
+update t1 set b=2 where a=1;
+connection node_2;
+set session wsrep_sync_wait=0;
+connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2b;
+SET GLOBAL debug_dbug = "d,sync.before_wsrep_thd_abort";
+connection node_1;
+select * from t1;
+a b
+1 0
+update t1 set b= 1 where a=1;
+connection node_2b;
+SET SESSION DEBUG_SYNC = "now WAIT_FOR sync.before_wsrep_thd_abort_reached";
+connection node_2;
+SET DEBUG_SYNC= 'before_awake_no_mutex SIGNAL awake_reached WAIT_FOR continue_kill';
+connection node_2b;
+SET DEBUG_SYNC='now WAIT_FOR awake_reached';
+SET GLOBAL debug_dbug = "";
+SET DEBUG_SYNC = "now SIGNAL signal.before_wsrep_thd_abort";
+SET DEBUG_SYNC = "now SIGNAL continue_kill";
+connection node_2;
+connection node_2a;
+select * from t1;
+connection node_2;
+SET DEBUG_SYNC = "RESET";
+drop table t1;
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+CREATE TABLE t1 (i int primary key);
+SET DEBUG_SYNC = "before_wsrep_ordered_commit SIGNAL bwoc_reached WAIT_FOR bwoc_continue";
+INSERT INTO t1 VALUES (1);
+connection node_2;
+SET DEBUG_SYNC = "now WAIT_FOR bwoc_reached";
+SET DEBUG_SYNC = "now SIGNAL bwoc_continue";
+SET DEBUG_SYNC='RESET';
+connection node_2a;
+connection node_2;
+select * from t1;
+i
+1
+disconnect node_2a;
+connection node_2;
+drop table t1;
diff --git a/mysql-test/suite/galera/t/MW-388.test b/mysql-test/suite/galera/t/MW-388.test
index fafdde092bf..2d3fe5b5d93 100644
--- a/mysql-test/suite/galera/t/MW-388.test
+++ b/mysql-test/suite/galera/t/MW-388.test
@@ -1,5 +1,7 @@
--source include/galera_cluster.inc
+--source include/have_debug.inc
--source include/have_debug_sync.inc
+--source include/galera_have_debug_sync.inc
--connection node_1
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(255)) Engine=InnoDB;
diff --git a/mysql-test/suite/galera/t/MW-86-wait1.test b/mysql-test/suite/galera/t/MW-86-wait1.test
index a7476b74e68..c5b78be64e5 100644
--- a/mysql-test/suite/galera/t/MW-86-wait1.test
+++ b/mysql-test/suite/galera/t/MW-86-wait1.test
@@ -7,6 +7,7 @@
--source include/have_binlog_format_row.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
+--source include/galera_have_debug_sync.inc
--connection node_2
# Make sure no signals have been leftover from previous tests to surprise us.
diff --git a/mysql-test/suite/galera/t/MW-86-wait8.test b/mysql-test/suite/galera/t/MW-86-wait8.test
index 551b0f67b7c..809f5047c51 100644
--- a/mysql-test/suite/galera/t/MW-86-wait8.test
+++ b/mysql-test/suite/galera/t/MW-86-wait8.test
@@ -4,6 +4,7 @@
--source include/galera_cluster.inc
--source include/have_binlog_format_row.inc
--source include/have_debug_sync.inc
+--source include/galera_have_debug_sync.inc
--connection node_2
# Make sure no signals have been leftover from previous tests to surprise us.
diff --git a/mysql-test/suite/galera/t/galera_bf_kill.test b/mysql-test/suite/galera/t/galera_bf_kill.test
new file mode 100644
index 00000000000..f09a4ba52db
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_bf_kill.test
@@ -0,0 +1,273 @@
+--source include/galera_cluster.inc
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+
+#
+# Test case 1: Start a transaction on node_2a and kill it
+# from other connection on same node
+#
+
+--connection node_2
+CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB;
+insert into t1 values (NULL,1);
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a = 5;
+
+--connection node_2
+
+--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1
+--source include/wait_condition.inc
+
+--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1`
+
+--disable_query_log
+--eval KILL $k_thread
+--enable_query_log
+
+select * from t1;
+--disconnect node_2a
+
+#
+# Test case 2: Start a transaction on node_2a and use
+# kill query from other connection on same node
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5;
+
+--connection node_2
+--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1
+--source include/wait_condition.inc
+
+--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1`
+
+--disable_query_log
+--eval KILL QUERY $k_thread
+--enable_query_log
+
+select * from t1;
+--disconnect node_2a
+#
+# Test case 3: Start a transaction on node_2a and start a DDL on other transaction
+# that will then abort node_2a transaction
+#
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5, b=2;
+
+--connection node_2
+ALTER TABLE t1 ADD UNIQUE KEY b1(b);
+ALTER TABLE t1 DROP KEY b1;
+
+select * from t1;
+
+--disconnect node_2a
+
+#
+# Test case 4: Start a transaction on node_2a and conflicting transaction on node_2b
+# and start a DDL on other transaction that will then abort node_2a and node_2b
+# transactions
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5, b=2;
+
+--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2b
+begin;
+send update t1 set a =6, b=7;
+
+--connection node_2
+ALTER TABLE t1 ADD UNIQUE KEY b2(b);
+ALTER TABLE t1 DROP KEY b2;
+
+select * from t1;
+
+--disconnect node_2a
+--disconnect node_2b
+
+#
+# Test case 5: Start a transaction on node_2a with wsrep disabled
+# and start a DDL on other transaction that will then abort node_2a
+# transactions
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+SET SESSION wsrep_on=OFF;
+begin;
+update t1 set a =5, b=2;
+
+--connection node_2
+ALTER TABLE t1 ADD UNIQUE KEY b3(b);
+
+select * from t1;
+
+--disconnect node_2a
+
+#
+# Test case 6: Start a transaction on node_2a with wsrep disabled
+# and kill it from other connection on same node
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+SET SESSION wsrep_on=OFF;
+begin;
+update t1 set a =5, b=2;
+
+--connection node_2
+--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1
+--source include/wait_condition.inc
+
+--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1`
+
+--disable_query_log
+--eval KILL $k_thread
+--enable_query_log
+
+
+select * from t1;
+
+--disconnect node_2a
+
+#
+# Test case 7:
+# 1. Start a transaction on node_2,
+# and leave it pending while holding a row locked
+# 2. set sync point pause applier
+# 3. send a conflicting write on node_1, it will pause
+# at the sync point
+# 4. though another connection to node_2, kill the local
+# transaction
+#
+
+#
+# connection node_2a runs a local transaction, that is victim of BF abort
+# and victim of KILL command by connection node_2
+#
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+truncate t1;
+insert into t1 values (1,0);
+
+# start a transaction that will conflict with later applier
+begin;
+update t1 set b=2 where a=1;
+
+--connection node_2
+set session wsrep_sync_wait=0;
+--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1
+--source include/wait_condition.inc
+
+--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1`
+
+# connection node_2b is for controlling debug syn points
+# first set a sync point for applier, to pause during BF aborting
+# and before THD::awake would be called
+#
+--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2b
+SET GLOBAL debug_dbug = "d,sync.before_wsrep_thd_abort";
+
+#
+# replicate an update, which will BF abort the victim node_2a
+# however, while applier in node 2 is handling the abort,
+# it will pause in sync point set by node_2b
+#
+--connection node_1
+select * from t1;
+update t1 set b= 1 where a=1;
+
+#
+# wait until the applying of above update has reached the sync point
+# in node 2
+#
+--connection node_2b
+SET SESSION DEBUG_SYNC = "now WAIT_FOR sync.before_wsrep_thd_abort_reached";
+
+--connection node_2
+#
+# pause KILL execution before awake
+#
+SET DEBUG_SYNC= 'before_awake_no_mutex SIGNAL awake_reached WAIT_FOR continue_kill';
+--disable_query_log
+--send_eval KILL $k_thread
+--enable_query_log
+
+
+--connection node_2b
+SET DEBUG_SYNC='now WAIT_FOR awake_reached';
+
+# release applier and KILL operator
+SET GLOBAL debug_dbug = "";
+SET DEBUG_SYNC = "now SIGNAL signal.before_wsrep_thd_abort";
+SET DEBUG_SYNC = "now SIGNAL continue_kill";
+
+--connection node_2
+--reap
+
+--connection node_2a
+--error 0,1213
+select * from t1;
+
+--connection node_2
+SET DEBUG_SYNC = "RESET";
+
+drop table t1;
+
+--disconnect node_2a
+#
+# Test case 7:
+# run a transaction in node 2, and set a sync point to pause the transaction
+# in commit phase.
+# Through another connection to node 2, kill the committing transaction by
+# KILL QUERY command
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+--let $connection_id = `SELECT CONNECTION_ID()`
+
+CREATE TABLE t1 (i int primary key);
+
+# Set up sync point
+SET DEBUG_SYNC = "before_wsrep_ordered_commit SIGNAL bwoc_reached WAIT_FOR bwoc_continue";
+
+# Send insert which will block in the sync point above
+--send INSERT INTO t1 VALUES (1)
+
+--connection node_2
+SET DEBUG_SYNC = "now WAIT_FOR bwoc_reached";
+
+--disable_query_log
+--disable_result_log
+# victim has passed the point of no return, kill is not possible anymore
+--eval KILL QUERY $connection_id
+--enable_result_log
+--enable_query_log
+
+SET DEBUG_SYNC = "now SIGNAL bwoc_continue";
+SET DEBUG_SYNC='RESET';
+--connection node_2a
+--error 0,1213
+--reap
+
+--connection node_2
+# victim was able to complete the INSERT
+select * from t1;
+
+--disconnect node_2a
+
+--connection node_2
+drop table t1;
+
diff --git a/mysql-test/suite/galera/t/galera_bf_lock_wait.test b/mysql-test/suite/galera/t/galera_bf_lock_wait.test
index e3a9077a888..7a036ee4781 100644
--- a/mysql-test/suite/galera/t/galera_bf_lock_wait.test
+++ b/mysql-test/suite/galera/t/galera_bf_lock_wait.test
@@ -1,6 +1,7 @@
--source include/galera_cluster.inc
--source include/big_test.inc
+--connection node_1
CREATE TABLE t1 ENGINE=InnoDB select 1 as a, 1 as b union select 2, 2;
ALTER TABLE t1 add primary key(a);
diff --git a/mysql-test/suite/galera/t/galera_query_cache_sync_wait.test b/mysql-test/suite/galera/t/galera_query_cache_sync_wait.test
index e13e7f1f748..dbe50a8b93c 100644
--- a/mysql-test/suite/galera/t/galera_query_cache_sync_wait.test
+++ b/mysql-test/suite/galera/t/galera_query_cache_sync_wait.test
@@ -1,6 +1,7 @@
--source include/galera_cluster.inc
--source include/have_debug_sync.inc
--source include/have_query_cache.inc
+--source include/galera_have_debug_sync.inc
CREATE TABLE t1 (id INT PRIMARY KEY AUTO_INCREMENT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
diff --git a/mysql-test/suite/galera/t/galera_var_retry_autocommit.test b/mysql-test/suite/galera/t/galera_var_retry_autocommit.test
index 142f02546b4..496219711e9 100644
--- a/mysql-test/suite/galera/t/galera_var_retry_autocommit.test
+++ b/mysql-test/suite/galera/t/galera_var_retry_autocommit.test
@@ -5,6 +5,7 @@
--source include/galera_cluster.inc
--source include/have_innodb.inc
--source include/have_debug_sync.inc
+--source include/galera_have_debug_sync.inc
--connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 6897a26bda1..85435203a34 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -798,7 +798,8 @@ THD::THD(bool is_wsrep_applier)
wsrep_po_handle(WSREP_PO_INITIALIZER),
wsrep_po_cnt(0),
wsrep_apply_format(0),
- wsrep_ignore_table(false)
+ wsrep_ignore_table(false),
+ wsrep_killed(false)
#endif
{
ulong tmp;
@@ -1349,6 +1350,7 @@ void THD::init(void)
wsrep_affected_rows = 0;
wsrep_replicate_GTID = false;
wsrep_skip_wsrep_GTID = false;
+ wsrep_killed = false;
#endif /* WITH_WSREP */
if (variables.sql_log_bin)
@@ -1985,7 +1987,6 @@ int THD::killed_errno()
DBUG_RETURN(0); // Keep compiler happy
}
-
/*
Remember the location of thread info, the structure needed for
sql_alloc() and the structure for the net buffer
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 7ca3896a69d..0fb563572e0 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -3025,7 +3025,6 @@ public:
void close_active_vio();
#endif
void awake(killed_state state_to_set);
-
/** Disconnect the associated communication endpoint. */
void disconnect();
@@ -3539,6 +3538,14 @@ public:
killed_err= 0;
mysql_mutex_unlock(&LOCK_thd_kill);
}
+
+#ifdef WITH_WSREP
+ mysql_mutex_assert_not_owner(&LOCK_thd_data);
+ mysql_mutex_lock(&LOCK_thd_data);
+ wsrep_killed= false;
+ mysql_mutex_unlock(&LOCK_thd_data);
+#endif /* WITH_WSREP */
+
}
inline void reset_kill_query()
{
@@ -4152,6 +4159,7 @@ public:
*/
bool wsrep_ignore_table;
wsrep_gtid_t wsrep_sync_wait_gtid;
+ bool wsrep_killed; // protected by LOCK_thd_data
ulong wsrep_affected_rows;
bool wsrep_replicate_GTID;
bool wsrep_skip_wsrep_GTID;
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 656da3b6a79..765cca31e81 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -8324,8 +8324,24 @@ kill_one_thread(THD *thd, longlong id, killed_state kill_signal, killed_type typ
thd->security_ctx->user_matches(tmp->security_ctx)) &&
!wsrep_thd_is_BF(tmp, false))
{
- tmp->awake(kill_signal);
- error=0;
+#ifdef WITH_WSREP
+ DEBUG_SYNC(thd, "before_awake_no_mutex");
+
+ // Note that find_thread_by_id will lock tmp->LOCK_thd_data
+ if (wsrep_thd_set_wsrep_killed(tmp))
+ {
+ WSREP_DEBUG("Kill transaction thread %llu skipped due to wsrep_killed", tmp->thread_id);
+ error= 0;
+ }
+ else
+#endif /* WITH_WSREP */
+ {
+ WSREP_DEBUG("kill_one_thread %llu, victim: %llu wsrep_killed %d by signal %d",
+ thd->thread_id, id, tmp->wsrep_killed, kill_signal);
+ tmp->awake(kill_signal);
+ WSREP_DEBUG("victim: %llu taken care of", id);
+ error= 0;
+ }
}
else
error= (type == KILL_TYPE_QUERY ? ER_KILL_QUERY_DENIED_ERROR :
diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic
index 6cf2a31f76b..96052668bf9 100644
--- a/sql/sql_plugin_services.ic
+++ b/sql/sql_plugin_services.ic
@@ -181,7 +181,8 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_trx_is_aborting,
wsrep_trx_order_before,
wsrep_unlock_rollback,
- wsrep_set_data_home_dir
+ wsrep_set_data_home_dir,
+ wsrep_thd_set_wsrep_killed
};
static struct thd_specifics_service_st thd_specifics_handler=
diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc
index 364ef2d3e7a..0041eec084c 100644
--- a/sql/wsrep_dummy.cc
+++ b/sql/wsrep_dummy.cc
@@ -1,4 +1,4 @@
-/* Copyright (C) 2014 SkySQL Ab.
+/* Copyright (C) 2014, 2020, MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -101,8 +101,8 @@ enum wsrep_conflict_state wsrep_thd_get_conflict_state(THD *)
my_bool wsrep_thd_is_wsrep(THD *)
{ return 0; }
-char *wsrep_thd_query(THD *)
-{ return 0; }
+const char *wsrep_thd_query(THD *)
+{ return "NULL"; }
enum wsrep_query_state wsrep_thd_query_state(THD *)
{ return QUERY_IDLE; }
@@ -145,3 +145,6 @@ void wsrep_set_data_home_dir(const char *)
void wsrep_log(void (*)(const char *, ...), const char *, ...)
{
}
+
+bool wsrep_thd_set_wsrep_killed(THD*)
+{ return false;}
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 38f8ca413db..af6b3deca78 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -2601,12 +2601,14 @@ wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
void wsrep_thd_LOCK(THD *thd)
{
+ mysql_mutex_assert_not_owner(&thd->LOCK_thd_data);
mysql_mutex_lock(&thd->LOCK_thd_data);
}
void wsrep_thd_UNLOCK(THD *thd)
{
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
@@ -2634,9 +2636,9 @@ extern "C" query_id_t wsrep_thd_query_id(THD *thd)
}
-char *wsrep_thd_query(THD *thd)
+const char *wsrep_thd_query(THD *thd)
{
- return (thd) ? thd->query() : NULL;
+ return (thd) ? thd->query() : "NULL";
}
@@ -2654,11 +2656,10 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
{
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
if (signal)
{
- mysql_mutex_lock(&thd->LOCK_thd_data);
thd->awake(KILL_QUERY);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
}
else
{
@@ -2666,6 +2667,8 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
}
+
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
}
@@ -2930,3 +2933,14 @@ bool wsrep_node_is_synced()
{
return (WSREP_ON) ? (wsrep_config_state.get_status() == 4) : false;
}
+
+bool wsrep_thd_set_wsrep_killed(THD *thd)
+{
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
+ if (thd->wsrep_killed)
+ {
+ return true;
+ }
+ thd->wsrep_killed= true;
+ return false;
+}
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 2dd20927545..5a870853cf6 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -18542,235 +18542,323 @@ static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
#ifdef WITH_WSREP
-void
-wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
-{
- WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
- "caused by:\n\t"
- "1) unsupported configuration options combination, please check documentation.\n\t"
- "2) a bug in the code.\n\t"
- "3) a database corruption.\n Node consistency compromized, "
- "need to abort. Restart the node to resync with cluster.",
- (long long)bf_seqno, (long long)victim_seqno);
- abort();
-}
-/*******************************************************************//**
-This function is used to kill one transaction in BF. */
+static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ bf_seqno, victim_seqno);
+ abort();
+}
+
+/** This function is used to kill one transaction.
+
+This transaction was open on this node (not-yet-committed), and a
+conflicting writeset from some other node that was being applied
+caused a locking conflict. First committed (from other node)
+wins, thus open transaction is rolled back. BF stands for
+brute-force: any transaction can get aborted by galera any time
+it is necessary.
+
+This conflict can happen only when the replicated writeset (from
+other node) is being applied, not when it’s waiting in the queue.
+If our local transaction reached its COMMIT and this conflicting
+writeset was in the queue, then it should fail the local
+certification test instead.
+
+A brute force abort is only triggered by a locking conflict
+between a writeset being applied by an applier thread (slave thread)
+and an open transaction on the node, not by a Galera writeset
+comparison as in the local certification failure.
+
+@param[in] bf_thd Brute force (BF) thread
+@param[in,out] victim_trx Vimtim trx to be killed
+@param[in] signal Should victim be signaled */
UNIV_INTERN
int
wsrep_innobase_kill_one_trx(
- void * const bf_thd_ptr,
- const trx_t * const bf_trx,
+ THD* bf_thd,
trx_t *victim_trx,
- ibool signal)
+ bool signal)
{
- ut_ad(lock_mutex_own());
- ut_ad(trx_mutex_own(victim_trx));
- ut_ad(bf_thd_ptr);
- ut_ad(victim_trx);
+ ut_ad(bf_thd);
+ ut_ad(victim_trx);
+ ut_ad(lock_mutex_own());
+ ut_ad(trx_mutex_own(victim_trx));
DBUG_ENTER("wsrep_innobase_kill_one_trx");
- THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
- THD *thd = (THD *) victim_trx->mysql_thd;
- int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
- if (!thd) {
- DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
- WSREP_WARN("no THD for trx: " TRX_ID_FMT, victim_trx->id);
- DBUG_RETURN(1);
- }
+ THD *thd= (THD *) victim_trx->mysql_thd;
+ ut_ad(thd);
+ /* Note that bf_trx might not exist here e.g. on MDL conflict
+ case (test: galera_concurrent_ctas). Similarly, BF thread
+ could be also acquiring MDL-lock causing victim to be
+ aborted. However, we have not yet called innobase_trx_init()
+ for BF transaction (test: galera_many_columns) */
+ trx_t* bf_trx= thd_to_trx(bf_thd);
- if (!bf_thd) {
- DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
- WSREP_WARN("no BF THD for trx: " TRX_ID_FMT,
- bf_trx ? bf_trx->id : 0);
- DBUG_RETURN(1);
- }
+ DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort",
+ {
+ const char act[]=
+ "now "
+ "SIGNAL sync.before_wsrep_thd_abort_reached "
+ "WAIT_FOR signal.before_wsrep_thd_abort";
+ DBUG_ASSERT(!debug_sync_set_action(bf_thd,
+ STRING_WITH_LEN(act)));
+ };);
- WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+ wsrep_thd_LOCK(thd);
- WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: "
- TRX_ID_FMT,
- signal, (long long)bf_seqno,
- thd_get_thread_id(thd),
- victim_trx->id);
+ if (wsrep_thd_set_wsrep_killed(thd))
+ {
+ WSREP_DEBUG("innodb kill transaction skipped due to wsrep_killed");
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ }
- WSREP_DEBUG("Aborting query: %s conf %d trx: %" PRId64,
- (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void",
- wsrep_thd_conflict_state(thd, FALSE),
- wsrep_thd_ws_handle(thd)->trx_id);
-
- wsrep_thd_LOCK(thd);
- DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock",
- {
- const char act[]=
- "now "
- "wait_for signal.wsrep_after_BF_victim_lock";
- DBUG_ASSERT(!debug_sync_set_action(bf_thd,
- STRING_WITH_LEN(act)));
- };);
+ unsigned long long victim_trx_id= static_cast<unsigned long long>(victim_trx->id);
+ unsigned long long bf_trx_id= bf_trx ?
+ static_cast<unsigned long long>(bf_trx->id) :
+ TRX_ID_MAX;
+ unsigned long victim_thread= thd_get_thread_id(thd);
+ long long victim_seqno= wsrep_thd_trx_seqno(thd);
+ unsigned long bf_thread= thd_get_thread_id(bf_thd);
+ long long bf_seqno= wsrep_thd_trx_seqno(bf_thd);
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
- if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
- WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT,
- victim_trx->id);
+ WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s exec %s query: %s",
+ wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal",
+ bf_trx_id,
+ bf_thread,
+ bf_seqno,
+ wsrep_thd_query_state_str(bf_thd),
+ wsrep_thd_conflict_state_str(bf_thd),
+ wsrep_thd_exec_mode_str(bf_thd),
+ wsrep_thd_query(bf_thd));
+
+ WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s exec %s query: %s",
+ wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
+ victim_trx_id,
+ victim_thread,
+ victim_seqno,
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_exec_mode_str(thd),
+ wsrep_thd_query(thd));
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING)
+ {
+ WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu"
+ " thread: %lu",
+ victim_trx_id,
+ victim_thread);
wsrep_thd_UNLOCK(thd);
DBUG_RETURN(0);
}
- if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
- WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d",
- victim_trx->id,
- wsrep_thd_get_conflict_state(thd));
+ if (wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ {
+ WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu "
+ ", thread: %lu exec_mode: %s",
+ victim_trx_id,
+ victim_thread,
+ wsrep_thd_exec_mode_str(thd));
}
- switch (wsrep_thd_get_conflict_state(thd)) {
+ switch (wsrep_thd_get_conflict_state(thd))
+ {
case NO_CONFLICT:
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state",
+ victim_thread,
+ victim_trx_id);
wsrep_thd_set_conflict_state(thd, MUST_ABORT);
break;
- case MUST_ABORT:
- WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state",
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
+ }
+ case MUST_ABORT:
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state",
+ victim_thread,
+ victim_trx_id);
wsrep_thd_awake(thd, signal);
DBUG_RETURN(0);
break;
+ }
case ABORTED:
case ABORTING: // fall through
default:
- WSREP_DEBUG("victim " TRX_ID_FMT " in state %d",
- victim_trx->id, wsrep_thd_get_conflict_state(thd));
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s",
+ victim_thread,
+ victim_trx_id,
+ wsrep_thd_conflict_state_str(thd));
wsrep_thd_UNLOCK(thd);
DBUG_RETURN(0);
break;
}
+ }
- switch (wsrep_thd_query_state(thd)) {
+ switch (wsrep_thd_query_state(thd))
+ {
case QUERY_COMMITTING:
- enum wsrep_status rcode;
+ {
+ enum wsrep_status rcode=WSREP_OK;
- WSREP_DEBUG("kill query for: %ld",
- thd_get_thread_id(thd));
- WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT,
- victim_trx->id);
+ WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
} else {
wsrep_t *wsrep= get_wsrep();
- rcode = wsrep->abort_pre_commit(
- wsrep, bf_seqno,
- (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id
- );
- switch (rcode) {
+ rcode= wsrep->abort_pre_commit(wsrep, bf_seqno,
+ (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id);
+
+ switch (rcode)
+ {
case WSREP_WARNING:
- WSREP_DEBUG("cancel commit warning: "
- TRX_ID_FMT,
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
+ {
+ WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
wsrep_thd_awake(thd, signal);
DBUG_RETURN(1);
break;
+ }
case WSREP_OK:
break;
default:
- WSREP_ERROR(
- "cancel commit bad exit: %d "
- TRX_ID_FMT,
- rcode,
- victim_trx->id);
+ {
+ WSREP_ERROR("Victim cancel commit bad commit exit thread: "
+ "%lu trx: %llu rcode: %d ",
+ victim_thread,
+ victim_trx_id,
+ rcode);
/* unable to interrupt, must abort */
/* note: kill_mysql() will block, if we cannot.
- * kill the lock holder first.
- */
+ * kill the lock holder first. */
abort();
break;
}
+ }
}
- wsrep_thd_UNLOCK(thd);
+
wsrep_thd_awake(thd, signal);
break;
+ }
case QUERY_EXEC:
+ {
/* it is possible that victim trx is itself waiting for some
- * other lock. We need to cancel this waiting
- */
- WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT,
- victim_trx->id);
-
- victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
- if (victim_trx->lock.wait_lock) {
- WSREP_DEBUG("victim has wait flag: %ld",
- thd_get_thread_id(thd));
- lock_t* wait_lock = victim_trx->lock.wait_lock;
- if (wait_lock) {
- WSREP_DEBUG("canceling wait lock");
- victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
- lock_cancel_waiting_and_release(wait_lock);
- }
+ * other lock. We need to cancel this waiting */
+ WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu",
+ victim_thread, victim_trx_id);
- wsrep_thd_UNLOCK(thd);
+ lock_t* wait_lock = victim_trx->lock.wait_lock;
+
+ if (wait_lock)
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag",
+ victim_thread,
+ victim_trx_id);
+
+ victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
+ lock_cancel_waiting_and_release(wait_lock);
wsrep_thd_awake(thd, signal);
} else {
- /* abort currently executing query */
- DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu",
- thd_get_thread_id(thd)));
- WSREP_DEBUG("kill query for: %ld",
- thd_get_thread_id(thd));
- /* Note that innobase_kill_query will take lock_mutex
- and trx_mutex */
- wsrep_thd_UNLOCK(thd);
+ /* Abort currently executing query */
+ WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
wsrep_thd_awake(thd, signal);
/* for BF thd, we need to prevent him from committing */
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave for thread: "
+ "%lu trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
}
}
break;
+ }
case QUERY_IDLE:
{
- WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id);
+ WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: "
+ "%llu bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- WSREP_DEBUG("kill BF IDLE, seqno: %lld",
- (long long)wsrep_thd_trx_seqno(thd));
wsrep_thd_UNLOCK(thd);
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
DBUG_RETURN(0);
}
- /* This will lock thd from proceeding after net_read() */
+
+ /* This will lock thd from proceeding after net_read() */
wsrep_thd_set_conflict_state(thd, ABORTING);
wsrep_lock_rollback();
- if (wsrep_aborting_thd_contains(thd)) {
- WSREP_WARN("duplicate thd aborter %lu",
- thd_get_thread_id(thd));
+ if (wsrep_aborting_thd_contains(thd))
+ {
+ WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
} else {
wsrep_aborting_thd_enqueue(thd);
- DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
- thd_get_thread_id(thd)));
- WSREP_DEBUG("enqueuing trx abort for (%lu)",
- thd_get_thread_id(thd));
+ WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort",
+ victim_thread,
+ victim_trx_id);
}
- DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
- WSREP_DEBUG("signaling aborter");
wsrep_unlock_rollback();
wsrep_thd_UNLOCK(thd);
break;
}
default:
- WSREP_WARN("bad wsrep query state: %d",
- wsrep_thd_query_state(thd));
+ {
+ WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s",
+ victim_thread,
+ victim_trx_id,
+ wsrep_thd_query_state_str(thd));
+
wsrep_thd_UNLOCK(thd);
+ ut_error;
break;
}
+ }
DBUG_RETURN(0);
}
@@ -18784,9 +18872,10 @@ wsrep_abort_transaction(
my_bool signal)
{
DBUG_ENTER("wsrep_innobase_abort_thd");
-
+ ut_ad(bf_thd);
+ ut_ad(victim_thd);
+
trx_t* victim_trx = thd_to_trx(victim_thd);
- trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d",
wsrep_thd_query(bf_thd),
@@ -18797,18 +18886,17 @@ wsrep_abort_transaction(
lock_mutex_enter();
trx_mutex_enter(victim_trx);
victim_trx->abort_type = TRX_WSREP_ABORT;
- int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
+ int rcode = wsrep_innobase_kill_one_trx(bf_thd,
victim_trx, signal);
+ victim_trx->abort_type = TRX_SERVER_ABORT;
trx_mutex_exit(victim_trx);
lock_mutex_exit();
- victim_trx->abort_type = TRX_SERVER_ABORT;
wsrep_srv_conc_cancel_wait(victim_trx);
DBUG_RETURN(rcode);
} else {
WSREP_DEBUG("victim does not have transaction");
wsrep_thd_LOCK(victim_thd);
wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
- wsrep_thd_UNLOCK(victim_thd);
wsrep_thd_awake(victim_thd, signal);
}
diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h
index bcf8a893695..84cde3c93d2 100644
--- a/storage/innobase/include/ha_prototypes.h
+++ b/storage/innobase/include/ha_prototypes.h
@@ -284,10 +284,11 @@ innobase_casedn_str(
#ifdef WITH_WSREP
UNIV_INTERN
int
-wsrep_innobase_kill_one_trx(void * const thd_ptr,
- const trx_t * const bf_trx,
- trx_t *victim_trx,
- ibool signal);
+wsrep_innobase_kill_one_trx(
+ THD* bf_thd,
+ trx_t *victim_trx,
+ bool signal);
+
int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
unsigned char* str, unsigned int str_length,
unsigned int buf_length);
diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc
index 0c4e40067d1..3873d557afc 100644
--- a/storage/innobase/lock/lock0lock.cc
+++ b/storage/innobase/lock/lock0lock.cc
@@ -1809,7 +1809,7 @@ wsrep_kill_victim(
lock->trx->abort_type = TRX_WSREP_ABORT;
wsrep_innobase_kill_one_trx(trx->mysql_thd,
- (const trx_t*) trx, lock->trx, TRUE);
+ lock->trx, true);
lock->trx->abort_type = TRX_SERVER_ABORT;
}
}
diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc
index 0a77ce56eaf..bc6e44b7988 100644
--- a/storage/xtradb/handler/ha_innodb.cc
+++ b/storage/xtradb/handler/ha_innodb.cc
@@ -19519,264 +19519,362 @@ static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
#ifdef WITH_WSREP
-void
-wsrep_abort_slave_trx(wsrep_seqno_t bf_seqno, wsrep_seqno_t victim_seqno)
-{
- WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
- "caused by:\n\t"
- "1) unsupported configuration options combination, please check documentation.\n\t"
- "2) a bug in the code.\n\t"
- "3) a database corruption.\n Node consistency compromized, "
- "need to abort. Restart the node to resync with cluster.",
- (long long)bf_seqno, (long long)victim_seqno);
- abort();
-}
-/*******************************************************************//**
-This function is used to kill one transaction in BF. */
+
+static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ bf_seqno, victim_seqno);
+ abort();
+}
+
+/** This function is used to kill one transaction.
+
+This transaction was open on this node (not-yet-committed), and a
+conflicting writeset from some other node that was being applied
+caused a locking conflict. First committed (from other node)
+wins, thus open transaction is rolled back. BF stands for
+brute-force: any transaction can get aborted by galera any time
+it is necessary.
+
+This conflict can happen only when the replicated writeset (from
+other node) is being applied, not when it’s waiting in the queue.
+If our local transaction reached its COMMIT and this conflicting
+writeset was in the queue, then it should fail the local
+certification test instead.
+
+A brute force abort is only triggered by a locking conflict
+between a writeset being applied by an applier thread (slave thread)
+and an open transaction on the node, not by a Galera writeset
+comparison as in the local certification failure.
+
+@param[in] bf_thd Brute force (BF) thread
+@param[in,out] victim_trx Vimtim trx to be killed
+@param[in] signal Should victim be signaled */
UNIV_INTERN
int
wsrep_innobase_kill_one_trx(
- void * const bf_thd_ptr,
- const trx_t * const bf_trx,
+ THD* bf_thd,
trx_t *victim_trx,
- ibool signal)
+ bool signal)
{
- ut_ad(lock_mutex_own());
- ut_ad(trx_mutex_own(victim_trx));
- ut_ad(bf_thd_ptr);
- ut_ad(victim_trx);
+ ut_ad(bf_thd);
+ ut_ad(victim_trx);
+ ut_ad(lock_mutex_own());
+ ut_ad(trx_mutex_own(victim_trx));
DBUG_ENTER("wsrep_innobase_kill_one_trx");
- THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
- THD *thd = (THD *) victim_trx->mysql_thd;
- int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
- if (!thd) {
- DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
- WSREP_WARN("no THD for trx: " TRX_ID_FMT, victim_trx->id);
- DBUG_RETURN(1);
- }
+ THD *thd= (THD *) victim_trx->mysql_thd;
+ ut_ad(thd);
+ /* Note that bf_trx might not exist here e.g. on MDL conflict
+ case (test: galera_concurrent_ctas). Similarly, BF thread
+ could be also acquiring MDL-lock causing victim to be
+ aborted. However, we have not yet called innobase_trx_init()
+ for BF transaction (test: galera_many_columns) */
+ trx_t* bf_trx= thd_to_trx(bf_thd);
- if (!bf_thd) {
- DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
- WSREP_WARN("no BF THD for trx: " TRX_ID_FMT,
- bf_trx ? bf_trx->id : 0);
- DBUG_RETURN(1);
- }
+ DBUG_EXECUTE_IF("sync.before_wsrep_thd_abort",
+ {
+ const char act[]=
+ "now "
+ "SIGNAL sync.before_wsrep_thd_abort_reached "
+ "WAIT_FOR signal.before_wsrep_thd_abort";
+ DBUG_ASSERT(!debug_sync_set_action(bf_thd,
+ STRING_WITH_LEN(act)));
+ };);
- WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+ wsrep_thd_LOCK(thd);
- WSREP_DEBUG("BF kill (%lu, seqno: %lld), victim: (%lu) trx: "
- TRX_ID_FMT,
- signal, (long long)bf_seqno,
- thd_get_thread_id(thd),
- victim_trx->id);
+ if (wsrep_thd_set_wsrep_killed(thd))
+ {
+ WSREP_DEBUG("innodb kill transaction skipped due to wsrep_killed");
+ wsrep_thd_UNLOCK(thd);
+ DBUG_RETURN(0);
+ }
- WSREP_DEBUG("Aborting query: %s",
- (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void");
-
- wsrep_thd_LOCK(thd);
- DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock",
- {
- const char act[]=
- "now "
- "wait_for signal.wsrep_after_BF_victim_lock";
- DBUG_ASSERT(!debug_sync_set_action(bf_thd,
- STRING_WITH_LEN(act)));
- };);
+ unsigned long long victim_trx_id= static_cast<unsigned long long>(victim_trx->id);
+ unsigned long long bf_trx_id= bf_trx ?
+ static_cast<unsigned long long>(bf_trx->id) :
+ TRX_ID_MAX;
+ unsigned long victim_thread= thd_get_thread_id(thd);
+ long long victim_seqno= wsrep_thd_trx_seqno(thd);
+ unsigned long bf_thread= thd_get_thread_id(bf_thd);
+ long long bf_seqno= wsrep_thd_trx_seqno(bf_thd);
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
- if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
- WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT,
- victim_trx->id);
+ WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s exec %s query: %s",
+ wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal",
+ bf_trx_id,
+ bf_thread,
+ bf_seqno,
+ wsrep_thd_query_state_str(bf_thd),
+ wsrep_thd_conflict_state_str(bf_thd),
+ wsrep_thd_exec_mode_str(bf_thd),
+ wsrep_thd_query(bf_thd));
+
+ WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s exec %s query: %s",
+ wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
+ victim_trx_id,
+ victim_thread,
+ victim_seqno,
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_exec_mode_str(thd),
+ wsrep_thd_query(thd));
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING)
+ {
+ WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu"
+ " thread: %lu",
+ victim_trx_id,
+ victim_thread);
wsrep_thd_UNLOCK(thd);
DBUG_RETURN(0);
}
- if(wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
- WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d",
- victim_trx->id,
- wsrep_thd_get_conflict_state(thd));
+ if (wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ {
+ WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu "
+ ", thread: %lu exec_mode: %s",
+ victim_trx_id,
+ victim_thread,
+ wsrep_thd_exec_mode_str(thd));
}
- switch (wsrep_thd_get_conflict_state(thd)) {
+ switch (wsrep_thd_get_conflict_state(thd))
+ {
case NO_CONFLICT:
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state",
+ victim_thread,
+ victim_trx_id);
wsrep_thd_set_conflict_state(thd, MUST_ABORT);
break;
- case MUST_ABORT:
- WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state",
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
+ }
+ case MUST_ABORT:
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state",
+ victim_thread,
+ victim_trx_id);
wsrep_thd_awake(thd, signal);
DBUG_RETURN(0);
break;
+ }
case ABORTED:
case ABORTING: // fall through
default:
- WSREP_DEBUG("victim " TRX_ID_FMT " in state %d",
- victim_trx->id, wsrep_thd_get_conflict_state(thd));
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s",
+ victim_thread,
+ victim_trx_id,
+ wsrep_thd_conflict_state_str(thd));
wsrep_thd_UNLOCK(thd);
DBUG_RETURN(0);
break;
}
+ }
- switch (wsrep_thd_query_state(thd)) {
+ switch (wsrep_thd_query_state(thd))
+ {
case QUERY_COMMITTING:
- enum wsrep_status rcode;
+ {
+ enum wsrep_status rcode=WSREP_OK;
- WSREP_DEBUG("kill query for: %ld",
- thd_get_thread_id(thd));
- WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT,
- victim_trx->id);
+ WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
} else {
wsrep_t *wsrep= get_wsrep();
- rcode = wsrep->abort_pre_commit(
- wsrep, bf_seqno,
- (wsrep_trx_id_t)victim_trx->id
- );
- switch (rcode) {
+ rcode= wsrep->abort_pre_commit(wsrep, bf_seqno,
+ (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id);
+
+ switch (rcode)
+ {
case WSREP_WARNING:
- WSREP_DEBUG("cancel commit warning: "
- TRX_ID_FMT,
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
+ {
+ WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
wsrep_thd_awake(thd, signal);
DBUG_RETURN(1);
break;
+ }
case WSREP_OK:
break;
default:
- WSREP_ERROR(
- "cancel commit bad exit: %d "
- TRX_ID_FMT,
- rcode,
- victim_trx->id);
+ {
+ WSREP_ERROR("Victim cancel commit bad commit exit thread: "
+ "%lu trx: %llu rcode: %d ",
+ victim_thread,
+ victim_trx_id,
+ rcode);
/* unable to interrupt, must abort */
/* note: kill_mysql() will block, if we cannot.
- * kill the lock holder first.
- */
+ * kill the lock holder first. */
abort();
break;
}
+ }
}
- wsrep_thd_UNLOCK(thd);
+
wsrep_thd_awake(thd, signal);
break;
+ }
case QUERY_EXEC:
+ {
/* it is possible that victim trx is itself waiting for some
- * other lock. We need to cancel this waiting
- */
- WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT,
- victim_trx->id);
-
- victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
- if (victim_trx->lock.wait_lock) {
- WSREP_DEBUG("victim has wait flag: %ld",
- thd_get_thread_id(thd));
- lock_t* wait_lock = victim_trx->lock.wait_lock;
- if (wait_lock) {
- WSREP_DEBUG("canceling wait lock");
- victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
- lock_cancel_waiting_and_release(wait_lock);
- }
+ * other lock. We need to cancel this waiting */
+ WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu",
+ victim_thread, victim_trx_id);
- wsrep_thd_UNLOCK(thd);
+ lock_t* wait_lock = victim_trx->lock.wait_lock;
+
+ if (wait_lock)
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag",
+ victim_thread,
+ victim_trx_id);
+
+ victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
+ lock_cancel_waiting_and_release(wait_lock);
wsrep_thd_awake(thd, signal);
} else {
- /* abort currently executing query */
- DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu",
- thd_get_thread_id(thd)));
- WSREP_DEBUG("kill query for: %ld",
- thd_get_thread_id(thd));
- /* Note that innobase_kill_query will take lock_mutex
- and trx_mutex */
- wsrep_thd_UNLOCK(thd);
+ /* Abort currently executing query */
+ WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
wsrep_thd_awake(thd, signal);
/* for BF thd, we need to prevent him from committing */
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave for thread: "
+ "%lu trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
}
}
break;
+ }
case QUERY_IDLE:
{
- WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id);
+ WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: "
+ "%llu bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- WSREP_DEBUG("kill BF IDLE, seqno: %lld",
- (long long)wsrep_thd_trx_seqno(thd));
wsrep_thd_UNLOCK(thd);
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
DBUG_RETURN(0);
}
- /* This will lock thd from proceeding after net_read() */
+
+ /* This will lock thd from proceeding after net_read() */
wsrep_thd_set_conflict_state(thd, ABORTING);
wsrep_lock_rollback();
- if (wsrep_aborting_thd_contains(thd)) {
- WSREP_WARN("duplicate thd aborter %lu",
- thd_get_thread_id(thd));
+ if (wsrep_aborting_thd_contains(thd))
+ {
+ WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
} else {
wsrep_aborting_thd_enqueue(thd);
- DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
- thd_get_thread_id(thd)));
- WSREP_DEBUG("enqueuing trx abort for (%lu)",
- thd_get_thread_id(thd));
+ WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort",
+ victim_thread,
+ victim_trx_id);
}
- DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
- WSREP_DEBUG("signaling aborter");
wsrep_unlock_rollback();
wsrep_thd_UNLOCK(thd);
break;
}
default:
- WSREP_WARN("bad wsrep query state: %d",
- wsrep_thd_query_state(thd));
+ {
+ WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s",
+ victim_thread,
+ victim_trx_id,
+ wsrep_thd_query_state_str(thd));
+
wsrep_thd_UNLOCK(thd);
+ ut_error;
break;
}
+ }
DBUG_RETURN(0);
}
-static int
-wsrep_abort_transaction(handlerton* hton, THD *bf_thd, THD *victim_thd,
- my_bool signal)
+static
+int
+wsrep_abort_transaction(
+ handlerton* hton,
+ THD *bf_thd,
+ THD *victim_thd,
+ my_bool signal)
{
DBUG_ENTER("wsrep_innobase_abort_thd");
- trx_t* victim_trx = thd_to_trx(victim_thd);
- trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
- WSREP_DEBUG("abort transaction: BF: %s victim: %s",
- wsrep_thd_query(bf_thd),
- wsrep_thd_query(victim_thd));
+ ut_ad(bf_thd);
+ ut_ad(victim_thd);
+
+ trx_t* victim_trx = thd_to_trx(victim_thd);
+
+ WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d",
+ wsrep_thd_query(bf_thd),
+ wsrep_thd_query(victim_thd),
+ wsrep_thd_conflict_state(victim_thd, FALSE));
if (victim_trx) {
lock_mutex_enter();
trx_mutex_enter(victim_trx);
victim_trx->abort_type = TRX_WSREP_ABORT;
- int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
+ int rcode = wsrep_innobase_kill_one_trx(bf_thd,
victim_trx, signal);
+ victim_trx->abort_type = TRX_SERVER_ABORT;
trx_mutex_exit(victim_trx);
lock_mutex_exit();
- victim_trx->abort_type = TRX_SERVER_ABORT;
wsrep_srv_conc_cancel_wait(victim_trx);
DBUG_RETURN(rcode);
} else {
WSREP_DEBUG("victim does not have transaction");
wsrep_thd_LOCK(victim_thd);
wsrep_thd_set_conflict_state(victim_thd, MUST_ABORT);
- wsrep_thd_UNLOCK(victim_thd);
wsrep_thd_awake(victim_thd, signal);
}
diff --git a/storage/xtradb/include/ha_prototypes.h b/storage/xtradb/include/ha_prototypes.h
index e5b545e0727..266b5250a8a 100644
--- a/storage/xtradb/include/ha_prototypes.h
+++ b/storage/xtradb/include/ha_prototypes.h
@@ -303,10 +303,11 @@ innobase_casedn_str(
#ifdef WITH_WSREP
UNIV_INTERN
int
-wsrep_innobase_kill_one_trx(void * const thd_ptr,
- const trx_t * const bf_trx,
- trx_t *victim_trx,
- ibool signal);
+wsrep_innobase_kill_one_trx(
+ THD* bf_thd,
+ trx_t *victim_trx,
+ bool signal);
+
int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
unsigned char* str, unsigned int str_length,
unsigned int buf_length);
diff --git a/storage/xtradb/lock/lock0lock.cc b/storage/xtradb/lock/lock0lock.cc
index 4d40111ac20..844d81886da 100644
--- a/storage/xtradb/lock/lock0lock.cc
+++ b/storage/xtradb/lock/lock0lock.cc
@@ -1820,7 +1820,7 @@ wsrep_kill_victim(
lock->trx->abort_type = TRX_WSREP_ABORT;
wsrep_innobase_kill_one_trx(trx->mysql_thd,
- (const trx_t*) trx, lock->trx, TRUE);
+ lock->trx, true);
lock->trx->abort_type = TRX_SERVER_ABORT;
}
}