diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/wsrep_mysqld.h | 2 | ||||
-rw-r--r-- | sql/wsrep_schema.cc | 79 | ||||
-rw-r--r-- | sql/wsrep_schema.h | 19 |
3 files changed, 46 insertions, 54 deletions
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 3c430ccf487..4bf1d1d60a2 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -303,7 +303,6 @@ extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_mutex_t LOCK_wsrep_desync; extern mysql_mutex_t LOCK_wsrep_SR_pool; extern mysql_mutex_t LOCK_wsrep_SR_store; -extern mysql_mutex_t LOCK_wsrep_thd_pool; extern mysql_mutex_t LOCK_wsrep_config_state; extern my_bool wsrep_emulate_bin_log; extern int wsrep_to_isolation; @@ -330,7 +329,6 @@ extern PSI_mutex_key key_LOCK_wsrep_slave_threads; extern PSI_mutex_key key_LOCK_wsrep_desync; extern PSI_mutex_key key_LOCK_wsrep_SR_pool; extern PSI_mutex_key key_LOCK_wsrep_SR_store; -extern PSI_mutex_key key_LOCK_wsrep_thd_pool; extern PSI_mutex_key key_LOCK_wsrep_global_seqno; extern PSI_mutex_key key_LOCK_wsrep_thd_queue; extern PSI_cond_key key_COND_wsrep_thd_queue; diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index 98f17e41c94..57b116e899c 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2015-2017 Codership Oy <info@codership.com> +/* Copyright (C) 2015-2019 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -584,8 +584,6 @@ static void wsrep_init_thd_for_schema(THD *thd) thd->real_id=pthread_self(); // Keep purify happy - WSREP_DEBUG("Wsrep_thd_pool: creating system thread: %lld", - (long long)thd->thread_id); thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime; (void) mysql_mutex_unlock(&LOCK_thread_count); @@ -1115,7 +1113,7 @@ int Wsrep_schema::remove_fragments(THD* thd, DBUG_RETURN(ret); } -int Wsrep_schema::replay_transaction(THD* thd, +int Wsrep_schema::replay_transaction(THD* orig_thd, Relay_log_info* rli, const wsrep::ws_meta& ws_meta, const std::vector<wsrep::seqno>& fragments) @@ -1123,8 +1121,13 @@ int Wsrep_schema::replay_transaction(THD* thd, DBUG_ENTER("Wsrep_schema::replay_transaction"); DBUG_ASSERT(!fragments.empty()); - Wsrep_schema_impl::wsrep_off wsrep_off(thd); - Wsrep_schema_impl::binlog_off binlog_off(thd); + THD thd(next_thread_id(), true); + thd.thread_stack= (orig_thd ? orig_thd->thread_stack : + (char*) &thd); + + Wsrep_schema_impl::wsrep_off wsrep_off(&thd); + Wsrep_schema_impl::binlog_off binlog_off(&thd); + Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd); int ret= 1; int error; @@ -1135,11 +1138,11 @@ int Wsrep_schema::replay_transaction(THD* thd, for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin(); i != fragments.end(); ++i) { - Wsrep_schema_impl::init_stmt(thd); - if ((error= Wsrep_schema_impl::open_for_read(thd, sr_table_str.c_str(), &frag_table))) + Wsrep_schema_impl::init_stmt(&thd); + if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table))) { WSREP_WARN("Could not open SR table for read: %d", error); - Wsrep_schema_impl::finish_stmt(thd); + Wsrep_schema_impl::finish_stmt(&thd); DBUG_RETURN(1); } @@ -1169,20 +1172,28 @@ int Wsrep_schema::replay_transaction(THD* thd, String buf; frag_table->field[4]->val_str(&buf); - Wsrep_schema_impl::end_index_scan(frag_table); - Wsrep_schema_impl::finish_stmt(thd); - ret= wsrep_apply_events(thd, rli, buf.c_ptr_safe(), buf.length()); - if (ret) { - WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments"); - break; + Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd); + + ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length()); + if (ret) + { + WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments"); + break; + } } - Wsrep_schema_impl::init_stmt(thd); - if ((error= Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))) + Wsrep_schema_impl::end_index_scan(frag_table); + Wsrep_schema_impl::finish_stmt(&thd); + + Wsrep_schema_impl::init_stmt(&thd); + + if ((error= Wsrep_schema_impl::open_for_write(&thd, + sr_table_str.c_str(), + &frag_table))) { WSREP_WARN("Could not open SR table for write: %d", error); - Wsrep_schema_impl::finish_stmt(thd); + Wsrep_schema_impl::finish_stmt(&thd); DBUG_RETURN(1); } error= Wsrep_schema_impl::init_for_index_scan(frag_table, @@ -1206,7 +1217,7 @@ int Wsrep_schema::replay_transaction(THD* thd, break; } Wsrep_schema_impl::end_index_scan(frag_table); - Wsrep_schema_impl::finish_stmt(thd); + Wsrep_schema_impl::finish_stmt(&thd); } DBUG_RETURN(ret); @@ -1215,14 +1226,14 @@ int Wsrep_schema::replay_transaction(THD* thd, int Wsrep_schema::recover_sr_transactions(THD *orig_thd) { DBUG_ENTER("Wsrep_schema::recover_sr_transactions"); - THD storage_thd(true, true); + THD storage_thd(next_thread_id(), true); storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack : (char*) &storage_thd); TABLE* frag_table= 0; TABLE* cluster_table= 0; Wsrep_storage_service storage_service(&storage_thd); Wsrep_schema_impl::binlog_off binlog_off(&storage_thd); - Wsrep_schema_impl::wsrep_off binglog_off(&storage_thd); + Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd); Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &storage_thd); Wsrep_server_state& server_state(Wsrep_server_state::instance()); @@ -1233,13 +1244,9 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) Wsrep_schema_impl::init_stmt(&storage_thd); storage_thd.wsrep_skip_locking= FALSE; - /* - Open the table for reading and writing so that fragments without - valid seqno can be deleted. - */ - if (Wsrep_schema_impl::open_for_write(&storage_thd, - cluster_table_str.c_str(), - &cluster_table) || + if (Wsrep_schema_impl::open_for_read(&storage_thd, + cluster_table_str.c_str(), + &cluster_table) || Wsrep_schema_impl::init_for_scan(cluster_table)) { Wsrep_schema_impl::finish_stmt(&storage_thd); @@ -1273,10 +1280,15 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) storage_thd.wsrep_skip_locking= TRUE; Wsrep_schema_impl::init_stmt(&storage_thd); - if (Wsrep_schema_impl::open_for_read(&storage_thd, sr_table_str.c_str(), &frag_table) || + + /* + Open the table for reading and writing so that fragments without + valid seqno can be deleted. + */ + if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) || Wsrep_schema_impl::init_for_scan(frag_table)) { - WSREP_ERROR("Failed to open SR table for read"); + WSREP_ERROR("Failed to open SR table for write"); goto out; } @@ -1309,7 +1321,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) String data_str; (void)frag_table->field[4]->val_str(&data_str); - wsrep::const_buffer data(data_str.c_ptr(), data_str.length()); + wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length()); wsrep::ws_meta ws_meta(gtid, wsrep::stid(server_id, transaction_id, @@ -1319,14 +1331,13 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) wsrep::high_priority_service* applier; if (!(applier= server_state.find_streaming_applier(server_id, - transaction_id))) + transaction_id))) { DBUG_ASSERT(wsrep::starts_transaction(flags)); - THD* thd= new THD(true, true); + THD* thd= new THD(next_thread_id(), true); thd->thread_stack= (char*)&storage_thd; mysql_mutex_lock(&LOCK_thread_count); - thd->thread_id= next_thread_id(); thd->real_id= pthread_self(); mysql_mutex_unlock(&LOCK_thread_count); diff --git a/sql/wsrep_schema.h b/sql/wsrep_schema.h index fb5eaa8931f..36e23998d19 100644 --- a/sql/wsrep_schema.h +++ b/sql/wsrep_schema.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2015-2018 Codership Oy <info@codership.com> +/* Copyright (C) 2015-2019 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -20,13 +20,9 @@ /* wsrep-lib */ #include "wsrep_types.h" - #include "mysqld.h" -#include "thr_lock.h" /* enum thr_lock_type */ #include "wsrep_mysqld.h" -#include <string> - /* Forward decls */ @@ -64,14 +60,6 @@ class Wsrep_schema */ Wsrep_view restore_view(THD* thd, const Wsrep_id& own_id) const; - /* - Append transaction fragment to fragment storage. - Starts a trx using a THD from thd_pool, does not commit. - Should be followed by a call to update_frag_seqno(), or - release_SR_thd() if wsrep->certify() fails. - */ - THD* append_frag(const wsrep_trx_meta_t&, uint32_t, - const unsigned char*, size_t); /** Append transaction fragment to fragment storage. Transaction must have been started for THD before this call. @@ -145,11 +133,6 @@ class Wsrep_schema */ int recover_sr_transactions(THD* orig_thd); - /* - Close wsrep schema. - */ - void close(); - private: /* Non-copyable */ Wsrep_schema(const Wsrep_schema&); |