diff options
author | Denis Protivensky <denis.protivensky@galeracluster.com> | 2022-12-05 17:03:32 +0300 |
---|---|---|
committer | Jan Lindström <jan.lindstrom@mariadb.com> | 2023-01-24 12:43:12 +0200 |
commit | 653c0c6f5721f5004a32570f5870c0cc8477f1f3 (patch) | |
tree | 2efb7f2d00a8a7fa681df133911e398e792a1dc6 | |
parent | 946b6f0f7d5237eeca811bd177380235c54d11c4 (diff) | |
download | mariadb-git-bb-10.7-MDEV-24623.tar.gz |
MDEV-24623 Replicate bulk insert as table-level exclusive keybb-10.7-MDEV-24623
- introduce table key construction function in wsrep service interface
- don't add row keys when replicating bulk insert
- don't start bulk insert on applier or when transaction is not active
- don't start bulk insert for system versioned tables
- implement actual bulk insert table-level key replication
Reviewed-by: Jan Lindström <jan.lindstrom@mariadb.com>
-rw-r--r-- | include/mysql/service_wsrep.h | 12 | ||||
-rw-r--r-- | mysql-test/suite/galera/r/galera_insert_bulk.result | 30 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/galera_insert_bulk.test | 88 | ||||
-rw-r--r-- | sql/service_wsrep.cc | 20 | ||||
-rw-r--r-- | sql/sql_plugin_services.inl | 2 | ||||
-rw-r--r-- | sql/wsrep_dummy.cc | 6 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 17 | ||||
-rw-r--r-- | storage/innobase/row/row0ins.cc | 51 |
8 files changed, 224 insertions, 2 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h index 42b758c03f3..8541b348ae4 100644 --- a/include/mysql/service_wsrep.h +++ b/include/mysql/service_wsrep.h @@ -69,6 +69,9 @@ extern struct wsrep_service_st { void (*wsrep_thd_self_abort_func)(MYSQL_THD thd); int (*wsrep_thd_append_key_func)(MYSQL_THD thd, const struct wsrep_key* key, int n_keys, enum Wsrep_service_key_type); + int (*wsrep_thd_append_table_key_func)(MYSQL_THD thd, const char* db, + const char* table, enum Wsrep_service_key_type); + my_bool (*wsrep_thd_is_local_transaction)(const MYSQL_THD thd); const char* (*wsrep_thd_client_state_str_func)(const MYSQL_THD thd); const char* (*wsrep_thd_client_mode_str_func)(const MYSQL_THD thd); const char* (*wsrep_thd_transaction_state_str_func)(const MYSQL_THD thd); @@ -121,6 +124,8 @@ extern struct wsrep_service_st { #define wsrep_thd_is_local(T) wsrep_service->wsrep_thd_is_local_func(T) #define wsrep_thd_self_abort(T) wsrep_service->wsrep_thd_self_abort_func(T) #define wsrep_thd_append_key(T,W,N,K) wsrep_service->wsrep_thd_append_key_func(T,W,N,K) +#define wsrep_thd_append_table_key(T,D,B,K) wsrep_service->wsrep_thd_append_table_key_func(T,D,B,K) +#define wsrep_thd_is_local_transaction(T) wsrep_service->wsrep_thd_is_local_transaction_func(T) #define wsrep_thd_client_state_str(T) wsrep_service->wsrep_thd_client_state_str_func(T) #define wsrep_thd_client_mode_str(T) wsrep_service->wsrep_thd_client_mode_str_func(T) #define wsrep_thd_transaction_state_str(T) wsrep_service->wsrep_thd_transaction_state_str_func(T) @@ -226,6 +231,13 @@ extern "C" int wsrep_thd_append_key(MYSQL_THD thd, int n_keys, enum Wsrep_service_key_type); +extern "C" int wsrep_thd_append_table_key(MYSQL_THD thd, + const char* db, + const char* table, + enum Wsrep_service_key_type); + +extern "C" my_bool wsrep_thd_is_local_transaction(const MYSQL_THD thd); + extern const char* wsrep_sr_table_name_full; extern "C" const char* wsrep_get_sr_table_name(); diff --git a/mysql-test/suite/galera/r/galera_insert_bulk.result b/mysql-test/suite/galera/r/galera_insert_bulk.result new file mode 100644 index 00000000000..f4d4adf64e1 --- /dev/null +++ b/mysql-test/suite/galera/r/galera_insert_bulk.result @@ -0,0 +1,30 @@ +connection node_2; +connection node_1; +connection node_1; +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; +SET foreign_key_checks = 0; +SET unique_checks = 0; +START TRANSACTION; +connection node_2; +SET foreign_key_checks = 1; +SET unique_checks = 1; +INSERT INTO t1 VALUES (1001); +connection node_1; +COMMIT; +ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +DROP TABLE t1; +connection node_1; +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; +START TRANSACTION; +connection node_2; +SET foreign_key_checks = 1; +SET unique_checks = 1; +START TRANSACTION; +INSERT INTO t1 VALUES (1001); +connection node_1; +COMMIT; +2 +connection node_2; +COMMIT; +ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +DROP TABLE t1; diff --git a/mysql-test/suite/galera/t/galera_insert_bulk.test b/mysql-test/suite/galera/t/galera_insert_bulk.test new file mode 100644 index 00000000000..f58870d5f74 --- /dev/null +++ b/mysql-test/suite/galera/t/galera_insert_bulk.test @@ -0,0 +1,88 @@ +# +# Test that bulk insert replicates as table-level exclusive key and +# rolls back properly if needed. +# + +--source include/galera_cluster.inc +--source include/have_innodb.inc + +# +# Make bulk insert BF-abort, but regular insert succeed. +# + +--connection node_1 +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; + +# Disable foreign and unique key checks to allow bulk insert. +SET foreign_key_checks = 0; +SET unique_checks = 0; + +START TRANSACTION; + +--let $count=0 +--disable_query_log +while ($count < 1000) +{ + --eval INSERT INTO t1 VALUES ($count) + --inc $count +} +--enable_query_log + +--connection node_2 + +# Disable bulk insert. +SET foreign_key_checks = 1; +SET unique_checks = 1; + +# Insert a value out of the bulk insert range. +INSERT INTO t1 VALUES (1001); + +--connection node_1 +--error ER_LOCK_DEADLOCK +COMMIT; + +DROP TABLE t1; + +# +# Make bulk insert succeed, but regular insert BF-abort. +# + +--connection node_1 +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY) ENGINE=InnoDB; + +--let $before_bulk_keys = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_repl_keys'` + +START TRANSACTION; + +--let $count=0 +--disable_query_log +while ($count < 1000) +{ + --eval INSERT INTO t1 VALUES ($count) + --inc $count +} +--enable_query_log + +--connection node_2 + +# Disable bulk insert. +SET foreign_key_checks = 1; +SET unique_checks = 1; + +START TRANSACTION; + +# Insert a value out of the bulk insert range. +INSERT INTO t1 VALUES (1001); + +--connection node_1 +COMMIT; + +# Expect two keys to be added for bulk insert: DB-level shared key and table-level exclusive key. +--let $bulk_keys_count = `SELECT VARIABLE_VALUE - $before_bulk_keys FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_repl_keys'` +--echo $bulk_keys_count + +--connection node_2 +--error ER_LOCK_DEADLOCK +COMMIT; + +DROP TABLE t1; diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc index 943db803242..a5fa4f73fd6 100644 --- a/sql/service_wsrep.cc +++ b/sql/service_wsrep.cc @@ -417,3 +417,23 @@ extern "C" void wsrep_thd_set_PA_unsafe(THD *thd) WSREP_DEBUG("session does not have active transaction, can not mark as PA unsafe"); } } + +extern "C" int wsrep_thd_append_table_key(MYSQL_THD thd, + const char* db, + const char* table, + enum Wsrep_service_key_type key_type) +{ + wsrep_key_arr_t key_arr = {0, 0}; + int ret = wsrep_prepare_keys_for_isolation(thd, db, table, NULL, &key_arr); + ret = ret || wsrep_thd_append_key(thd, key_arr.keys, + (int)key_arr.keys_len, key_type); + wsrep_keys_free(&key_arr); + return ret; +} + +extern "C" my_bool wsrep_thd_is_local_transaction(const THD *thd) +{ + return (wsrep_thd_is_local(thd) && + thd->wsrep_cs().transaction().active()); +} + diff --git a/sql/sql_plugin_services.inl b/sql/sql_plugin_services.inl index e883081e9f7..3a66e982e7b 100644 --- a/sql/sql_plugin_services.inl +++ b/sql/sql_plugin_services.inl @@ -162,6 +162,8 @@ static struct wsrep_service_st wsrep_handler = { wsrep_thd_is_local, wsrep_thd_self_abort, wsrep_thd_append_key, + wsrep_thd_append_table_key, + wsrep_thd_is_local_transaction, wsrep_thd_client_state_str, wsrep_thd_client_mode_str, wsrep_thd_transaction_state_str, diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc index ac14fc4597a..9bfaf9285f3 100644 --- a/sql/wsrep_dummy.cc +++ b/sql/wsrep_dummy.cc @@ -101,6 +101,12 @@ void wsrep_thd_self_abort(THD *) int wsrep_thd_append_key(THD *, const struct wsrep_key*, int, enum Wsrep_service_key_type) { return 0; } +int wsrep_thd_append_table_key(THD *, const char*, const char*, enum Wsrep_service_key_type) +{ return 0; } + +my_bool wsrep_thd_is_local_transaction(const THD*) +{ return 0; } + const char* wsrep_thd_client_state_str(const THD*) { return 0; } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 1a745039c14..c9c4a8e3814 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -8023,6 +8023,7 @@ report_error: #ifdef WITH_WSREP if (!error_result && trx->is_wsrep() + && !trx->is_bulk_insert() && wsrep_thd_is_local(m_user_thd) && !wsrep_thd_ignore_table(m_user_thd) && !wsrep_consistency_check(m_user_thd) @@ -8734,6 +8735,9 @@ ha_innobase::delete_row( DBUG_ENTER("ha_innobase::delete_row"); + fprintf(stderr, "JAN::delete_row()"); + fflush(stderr); + if (is_read_only()) { DBUG_RETURN(HA_ERR_TABLE_READONLY); } else if (!trx_is_started(trx)) { @@ -8754,10 +8758,13 @@ ha_innobase::delete_row( error = row_update_for_mysql(m_prebuilt); + ut_ad(!trx->is_bulk_insert()); + #ifdef WITH_WSREP if (error == DB_SUCCESS && trx->is_wsrep() && wsrep_thd_is_local(m_user_thd) - && !wsrep_thd_ignore_table(m_user_thd)) { + && !wsrep_thd_ignore_table(m_user_thd)) + { if (wsrep_append_keys(m_user_thd, WSREP_SERVICE_KEY_EXCLUSIVE, record, NULL)) { @@ -10126,6 +10133,8 @@ wsrep_append_key( (shared, exclusive, semi...) */ ) { + ut_ad(!trx->is_bulk_insert()); + DBUG_ENTER("wsrep_append_key"); DBUG_PRINT("enter", ("thd: %lu trx: %lld", thd_get_thread_id(thd), @@ -15680,6 +15689,8 @@ ha_innobase::extra( trx->bulk_insert_apply(); trx->end_bulk_insert(*m_prebuilt->table); trx->bulk_insert = false; + fprintf(stderr, "JAN::: end bulk insert\n"); + fflush(stderr); break; case HA_EXTRA_NO_KEYREAD: m_prebuilt->read_just_key = 0; @@ -15822,6 +15833,8 @@ ha_innobase::start_stmt( trx->end_bulk_insert(); trx->bulk_insert = false; trx->last_sql_stat_start.least_undo_no = trx->undo_no; + fprintf(stderr, "JAN::: end bulk insert\n"); + fflush(stderr); } m_prebuilt->sql_stat_start = TRUE; @@ -16010,6 +16023,8 @@ ha_innobase::external_lock( } trx->bulk_insert = false; trx->last_sql_stat_start.least_undo_no = trx->undo_no; + fprintf(stderr, "JAN::: end bulk insert\n"); + fflush(stderr); } switch (m_prebuilt->table->quiesce) { diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc index 2c7e39392ec..af3f964e088 100644 --- a/storage/innobase/row/row0ins.cc +++ b/storage/innobase/row/row0ins.cc @@ -2573,6 +2573,42 @@ but GCC 4.8.5 does not support pop_options. */ # pragma GCC optimize ("O0") #endif +#ifdef WITH_WSREP +/** Start bulk insert operation for Galera by appending +table-level exclusive key for bulk insert. +@param trx transaction +@param index index +@retval false on success +@retval true on failure */ +ATTRIBUTE_COLD static bool row_ins_wsrep_start_bulk(trx_t *trx, const dict_index_t &index) +{ + char db_buf[NAME_LEN + 1]; + char tbl_buf[NAME_LEN + 1]; + ulint db_buf_len, tbl_buf_len; + + if (!index.table->parse_name(db_buf, tbl_buf, &db_buf_len, &tbl_buf_len)) + { + WSREP_ERROR("Parse_name for bulk insert failed: %s", + wsrep_thd_query(trx->mysql_thd)); + trx->error_state = DB_ROLLBACK; + return true; + } + + /* Append table-level exclusive key for bulk insert. */ + const int rcode = wsrep_thd_append_table_key(trx->mysql_thd, db_buf, + tbl_buf, WSREP_SERVICE_KEY_EXCLUSIVE); + if (rcode) + { + WSREP_ERROR("Appending table key for bulk insert failed: %s, %d", + wsrep_thd_query(trx->mysql_thd), rcode); + trx->error_state = DB_ROLLBACK; + return true; + } + + return false; +} +#endif + /***************************************************************//** Tries to insert an entry into a clustered index, ignoring foreign key constraints. If a record with the same unique key is found, the other @@ -2703,7 +2739,7 @@ commit_exit: && !index->table->n_rec_locks && !index->table->is_active_ddl() && !index->table->has_spatial_index() - && !trx->is_wsrep() /* FIXME: MDEV-24623 */ + && !index->table->versioned() && !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) { DEBUG_SYNC_C("empty_root_page_insert"); @@ -2724,6 +2760,19 @@ avoid_bulk: goto skip_bulk_insert; } +#ifdef WITH_WSREP + if (trx->is_wsrep()) + { + if (!wsrep_thd_is_local_transaction(trx->mysql_thd)) + goto skip_bulk_insert; + if (row_ins_wsrep_start_bulk(trx, *index)) + goto commit_exit; + + fprintf(stderr, "JAN:: bulk insert started"); + fflush(stderr); + } +#endif /* WITH_WSREP */ + #ifdef BTR_CUR_HASH_ADAPT if (btr_search_enabled) { btr_search_x_lock_all(); |