summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenis Protivensky <denis.protivensky@galeracluster.com>2022-12-05 17:03:32 +0300
committerJan Lindström <jan.lindstrom@mariadb.com>2023-01-24 12:43:12 +0200
commit653c0c6f5721f5004a32570f5870c0cc8477f1f3 (patch)
tree2efb7f2d00a8a7fa681df133911e398e792a1dc6
parent946b6f0f7d5237eeca811bd177380235c54d11c4 (diff)
downloadmariadb-git-bb-10.7-MDEV-24623.tar.gz
MDEV-24623 Replicate bulk insert as table-level exclusive keybb-10.7-MDEV-24623
- introduce table key construction function in wsrep service interface - don't add row keys when replicating bulk insert - don't start bulk insert on applier or when transaction is not active - don't start bulk insert for system versioned tables - implement actual bulk insert table-level key replication Reviewed-by: Jan Lindström <jan.lindstrom@mariadb.com>
-rw-r--r--include/mysql/service_wsrep.h12
-rw-r--r--mysql-test/suite/galera/r/galera_insert_bulk.result30
-rw-r--r--mysql-test/suite/galera/t/galera_insert_bulk.test88
-rw-r--r--sql/service_wsrep.cc20
-rw-r--r--sql/sql_plugin_services.inl2
-rw-r--r--sql/wsrep_dummy.cc6
-rw-r--r--storage/innobase/handler/ha_innodb.cc17
-rw-r--r--storage/innobase/row/row0ins.cc51
8 files changed, 224 insertions, 2 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h
index 42b758c03f3..8541b348ae4 100644
--- a/include/mysql/service_wsrep.h
+++ b/include/mysql/service_wsrep.h
@@ -69,6 +69,9 @@ extern struct wsrep_service_st {
void (*wsrep_thd_self_abort_func)(MYSQL_THD thd);
int (*wsrep_thd_append_key_func)(MYSQL_THD thd, const struct wsrep_key* key,
int n_keys, enum Wsrep_service_key_type);
+ int (*wsrep_thd_append_table_key_func)(MYSQL_THD thd, const char* db,
+ const char* table, enum Wsrep_service_key_type);
+ my_bool (*wsrep_thd_is_local_transaction)(const MYSQL_THD thd);
const char* (*wsrep_thd_client_state_str_func)(const MYSQL_THD thd);
const char* (*wsrep_thd_client_mode_str_func)(const MYSQL_THD thd);
const char* (*wsrep_thd_transaction_state_str_func)(const MYSQL_THD thd);
@@ -121,6 +124,8 @@ extern struct wsrep_service_st {
#define wsrep_thd_is_local(T) wsrep_service->wsrep_thd_is_local_func(T)
#define wsrep_thd_self_abort(T) wsrep_service->wsrep_thd_self_abort_func(T)
#define wsrep_thd_append_key(T,W,N,K) wsrep_service->wsrep_thd_append_key_func(T,W,N,K)
+#define wsrep_thd_append_table_key(T,D,B,K) wsrep_service->wsrep_thd_append_table_key_func(T,D,B,K)
+#define wsrep_thd_is_local_transaction(T) wsrep_service->wsrep_thd_is_local_transaction_func(T)
#define wsrep_thd_client_state_str(T) wsrep_service->wsrep_thd_client_state_str_func(T)
#define wsrep_thd_client_mode_str(T) wsrep_service->wsrep_thd_client_mode_str_func(T)
#define wsrep_thd_transaction_state_str(T) wsrep_service->wsrep_thd_transaction_state_str_func(T)
@@ -226,6 +231,13 @@ extern "C" int wsrep_thd_append_key(MYSQL_THD thd,
int n_keys,
enum Wsrep_service_key_type);
+extern "C" int wsrep_thd_append_table_key(MYSQL_THD thd,
+ const char* db,
+ const char* table,
+ enum Wsrep_service_key_type);
+
+extern "C" my_bool wsrep_thd_is_local_transaction(const MYSQL_THD thd);
+
extern const char* wsrep_sr_table_name_full;
extern "C" const char* wsrep_get_sr_table_name();
diff --git a/mysql-test/suite/galera/r/galera_insert_bulk.result b/mysql-test/suite/galera/r/galera_insert_bulk.result
new file mode 100644
index 00000000000..f4d4adf64e1
--- /dev/null
+++ b/mysql-test/suite/galera/r/galera_insert_bulk.result
@@ -0,0 +1,30 @@
+connection node_2;
+connection node_1;
+connection node_1;
+CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
+SET foreign_key_checks = 0;
+SET unique_checks = 0;
+START TRANSACTION;
+connection node_2;
+SET foreign_key_checks = 1;
+SET unique_checks = 1;
+INSERT INTO t1 VALUES (1001);
+connection node_1;
+COMMIT;
+ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
+DROP TABLE t1;
+connection node_1;
+CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
+START TRANSACTION;
+connection node_2;
+SET foreign_key_checks = 1;
+SET unique_checks = 1;
+START TRANSACTION;
+INSERT INTO t1 VALUES (1001);
+connection node_1;
+COMMIT;
+2
+connection node_2;
+COMMIT;
+ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
+DROP TABLE t1;
diff --git a/mysql-test/suite/galera/t/galera_insert_bulk.test b/mysql-test/suite/galera/t/galera_insert_bulk.test
new file mode 100644
index 00000000000..f58870d5f74
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_insert_bulk.test
@@ -0,0 +1,88 @@
+#
+# Test that bulk insert replicates as table-level exclusive key and
+# rolls back properly if needed.
+#
+
+--source include/galera_cluster.inc
+--source include/have_innodb.inc
+
+#
+# Make bulk insert BF-abort, but regular insert succeed.
+#
+
+--connection node_1
+CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
+
+# Disable foreign and unique key checks to allow bulk insert.
+SET foreign_key_checks = 0;
+SET unique_checks = 0;
+
+START TRANSACTION;
+
+--let $count=0
+--disable_query_log
+while ($count < 1000)
+{
+ --eval INSERT INTO t1 VALUES ($count)
+ --inc $count
+}
+--enable_query_log
+
+--connection node_2
+
+# Disable bulk insert.
+SET foreign_key_checks = 1;
+SET unique_checks = 1;
+
+# Insert a value out of the bulk insert range.
+INSERT INTO t1 VALUES (1001);
+
+--connection node_1
+--error ER_LOCK_DEADLOCK
+COMMIT;
+
+DROP TABLE t1;
+
+#
+# Make bulk insert succeed, but regular insert BF-abort.
+#
+
+--connection node_1
+CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB;
+
+--let $before_bulk_keys = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_repl_keys'`
+
+START TRANSACTION;
+
+--let $count=0
+--disable_query_log
+while ($count < 1000)
+{
+ --eval INSERT INTO t1 VALUES ($count)
+ --inc $count
+}
+--enable_query_log
+
+--connection node_2
+
+# Disable bulk insert.
+SET foreign_key_checks = 1;
+SET unique_checks = 1;
+
+START TRANSACTION;
+
+# Insert a value out of the bulk insert range.
+INSERT INTO t1 VALUES (1001);
+
+--connection node_1
+COMMIT;
+
+# Expect two keys to be added for bulk insert: DB-level shared key and table-level exclusive key.
+--let $bulk_keys_count = `SELECT VARIABLE_VALUE - $before_bulk_keys FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_repl_keys'`
+--echo $bulk_keys_count
+
+--connection node_2
+--error ER_LOCK_DEADLOCK
+COMMIT;
+
+DROP TABLE t1;
diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc
index 943db803242..a5fa4f73fd6 100644
--- a/sql/service_wsrep.cc
+++ b/sql/service_wsrep.cc
@@ -417,3 +417,23 @@ extern "C" void wsrep_thd_set_PA_unsafe(THD *thd)
WSREP_DEBUG("session does not have active transaction, can not mark as PA unsafe");
}
}
+
+extern "C" int wsrep_thd_append_table_key(MYSQL_THD thd,
+ const char* db,
+ const char* table,
+ enum Wsrep_service_key_type key_type)
+{
+ wsrep_key_arr_t key_arr = {0, 0};
+ int ret = wsrep_prepare_keys_for_isolation(thd, db, table, NULL, &key_arr);
+ ret = ret || wsrep_thd_append_key(thd, key_arr.keys,
+ (int)key_arr.keys_len, key_type);
+ wsrep_keys_free(&key_arr);
+ return ret;
+}
+
+extern "C" my_bool wsrep_thd_is_local_transaction(const THD *thd)
+{
+ return (wsrep_thd_is_local(thd) &&
+ thd->wsrep_cs().transaction().active());
+}
+
diff --git a/sql/sql_plugin_services.inl b/sql/sql_plugin_services.inl
index e883081e9f7..3a66e982e7b 100644
--- a/sql/sql_plugin_services.inl
+++ b/sql/sql_plugin_services.inl
@@ -162,6 +162,8 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_thd_is_local,
wsrep_thd_self_abort,
wsrep_thd_append_key,
+ wsrep_thd_append_table_key,
+ wsrep_thd_is_local_transaction,
wsrep_thd_client_state_str,
wsrep_thd_client_mode_str,
wsrep_thd_transaction_state_str,
diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc
index ac14fc4597a..9bfaf9285f3 100644
--- a/sql/wsrep_dummy.cc
+++ b/sql/wsrep_dummy.cc
@@ -101,6 +101,12 @@ void wsrep_thd_self_abort(THD *)
int wsrep_thd_append_key(THD *, const struct wsrep_key*, int, enum Wsrep_service_key_type)
{ return 0; }
+int wsrep_thd_append_table_key(THD *, const char*, const char*, enum Wsrep_service_key_type)
+{ return 0; }
+
+my_bool wsrep_thd_is_local_transaction(const THD*)
+{ return 0; }
+
const char* wsrep_thd_client_state_str(const THD*)
{ return 0; }
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 1a745039c14..c9c4a8e3814 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -8023,6 +8023,7 @@ report_error:
#ifdef WITH_WSREP
if (!error_result && trx->is_wsrep()
+ && !trx->is_bulk_insert()
&& wsrep_thd_is_local(m_user_thd)
&& !wsrep_thd_ignore_table(m_user_thd)
&& !wsrep_consistency_check(m_user_thd)
@@ -8734,6 +8735,9 @@ ha_innobase::delete_row(
DBUG_ENTER("ha_innobase::delete_row");
+ fprintf(stderr, "JAN::delete_row()");
+ fflush(stderr);
+
if (is_read_only()) {
DBUG_RETURN(HA_ERR_TABLE_READONLY);
} else if (!trx_is_started(trx)) {
@@ -8754,10 +8758,13 @@ ha_innobase::delete_row(
error = row_update_for_mysql(m_prebuilt);
+ ut_ad(!trx->is_bulk_insert());
+
#ifdef WITH_WSREP
if (error == DB_SUCCESS && trx->is_wsrep()
&& wsrep_thd_is_local(m_user_thd)
- && !wsrep_thd_ignore_table(m_user_thd)) {
+ && !wsrep_thd_ignore_table(m_user_thd))
+ {
if (wsrep_append_keys(m_user_thd, WSREP_SERVICE_KEY_EXCLUSIVE,
record,
NULL)) {
@@ -10126,6 +10133,8 @@ wsrep_append_key(
(shared, exclusive, semi...) */
)
{
+ ut_ad(!trx->is_bulk_insert());
+
DBUG_ENTER("wsrep_append_key");
DBUG_PRINT("enter",
("thd: %lu trx: %lld", thd_get_thread_id(thd),
@@ -15680,6 +15689,8 @@ ha_innobase::extra(
trx->bulk_insert_apply();
trx->end_bulk_insert(*m_prebuilt->table);
trx->bulk_insert = false;
+ fprintf(stderr, "JAN::: end bulk insert\n");
+ fflush(stderr);
break;
case HA_EXTRA_NO_KEYREAD:
m_prebuilt->read_just_key = 0;
@@ -15822,6 +15833,8 @@ ha_innobase::start_stmt(
trx->end_bulk_insert();
trx->bulk_insert = false;
trx->last_sql_stat_start.least_undo_no = trx->undo_no;
+ fprintf(stderr, "JAN::: end bulk insert\n");
+ fflush(stderr);
}
m_prebuilt->sql_stat_start = TRUE;
@@ -16010,6 +16023,8 @@ ha_innobase::external_lock(
}
trx->bulk_insert = false;
trx->last_sql_stat_start.least_undo_no = trx->undo_no;
+ fprintf(stderr, "JAN::: end bulk insert\n");
+ fflush(stderr);
}
switch (m_prebuilt->table->quiesce) {
diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc
index 2c7e39392ec..af3f964e088 100644
--- a/storage/innobase/row/row0ins.cc
+++ b/storage/innobase/row/row0ins.cc
@@ -2573,6 +2573,42 @@ but GCC 4.8.5 does not support pop_options. */
# pragma GCC optimize ("O0")
#endif
+#ifdef WITH_WSREP
+/** Start bulk insert operation for Galera by appending
+table-level exclusive key for bulk insert.
+@param trx transaction
+@param index index
+@retval false on success
+@retval true on failure */
+ATTRIBUTE_COLD static bool row_ins_wsrep_start_bulk(trx_t *trx, const dict_index_t &index)
+{
+ char db_buf[NAME_LEN + 1];
+ char tbl_buf[NAME_LEN + 1];
+ ulint db_buf_len, tbl_buf_len;
+
+ if (!index.table->parse_name(db_buf, tbl_buf, &db_buf_len, &tbl_buf_len))
+ {
+ WSREP_ERROR("Parse_name for bulk insert failed: %s",
+ wsrep_thd_query(trx->mysql_thd));
+ trx->error_state = DB_ROLLBACK;
+ return true;
+ }
+
+ /* Append table-level exclusive key for bulk insert. */
+ const int rcode = wsrep_thd_append_table_key(trx->mysql_thd, db_buf,
+ tbl_buf, WSREP_SERVICE_KEY_EXCLUSIVE);
+ if (rcode)
+ {
+ WSREP_ERROR("Appending table key for bulk insert failed: %s, %d",
+ wsrep_thd_query(trx->mysql_thd), rcode);
+ trx->error_state = DB_ROLLBACK;
+ return true;
+ }
+
+ return false;
+}
+#endif
+
/***************************************************************//**
Tries to insert an entry into a clustered index, ignoring foreign key
constraints. If a record with the same unique key is found, the other
@@ -2703,7 +2739,7 @@ commit_exit:
&& !index->table->n_rec_locks
&& !index->table->is_active_ddl()
&& !index->table->has_spatial_index()
- && !trx->is_wsrep() /* FIXME: MDEV-24623 */
+ && !index->table->versioned()
&& !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
DEBUG_SYNC_C("empty_root_page_insert");
@@ -2724,6 +2760,19 @@ avoid_bulk:
goto skip_bulk_insert;
}
+#ifdef WITH_WSREP
+ if (trx->is_wsrep())
+ {
+ if (!wsrep_thd_is_local_transaction(trx->mysql_thd))
+ goto skip_bulk_insert;
+ if (row_ins_wsrep_start_bulk(trx, *index))
+ goto commit_exit;
+
+ fprintf(stderr, "JAN:: bulk insert started");
+ fflush(stderr);
+ }
+#endif /* WITH_WSREP */
+
#ifdef BTR_CUR_HASH_ADAPT
if (btr_search_enabled) {
btr_search_x_lock_all();