diff options
Diffstat (limited to 'sql/wsrep_schema.cc')
-rw-r--r-- | sql/wsrep_schema.cc | 79 |
1 files changed, 45 insertions, 34 deletions
diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index 98f17e41c94..57b116e899c 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2015-2017 Codership Oy <info@codership.com> +/* Copyright (C) 2015-2019 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -584,8 +584,6 @@ static void wsrep_init_thd_for_schema(THD *thd) thd->real_id=pthread_self(); // Keep purify happy - WSREP_DEBUG("Wsrep_thd_pool: creating system thread: %lld", - (long long)thd->thread_id); thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime; (void) mysql_mutex_unlock(&LOCK_thread_count); @@ -1115,7 +1113,7 @@ int Wsrep_schema::remove_fragments(THD* thd, DBUG_RETURN(ret); } -int Wsrep_schema::replay_transaction(THD* thd, +int Wsrep_schema::replay_transaction(THD* orig_thd, Relay_log_info* rli, const wsrep::ws_meta& ws_meta, const std::vector<wsrep::seqno>& fragments) @@ -1123,8 +1121,13 @@ int Wsrep_schema::replay_transaction(THD* thd, DBUG_ENTER("Wsrep_schema::replay_transaction"); DBUG_ASSERT(!fragments.empty()); - Wsrep_schema_impl::wsrep_off wsrep_off(thd); - Wsrep_schema_impl::binlog_off binlog_off(thd); + THD thd(next_thread_id(), true); + thd.thread_stack= (orig_thd ? orig_thd->thread_stack : + (char*) &thd); + + Wsrep_schema_impl::wsrep_off wsrep_off(&thd); + Wsrep_schema_impl::binlog_off binlog_off(&thd); + Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd); int ret= 1; int error; @@ -1135,11 +1138,11 @@ int Wsrep_schema::replay_transaction(THD* thd, for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin(); i != fragments.end(); ++i) { - Wsrep_schema_impl::init_stmt(thd); - if ((error= Wsrep_schema_impl::open_for_read(thd, sr_table_str.c_str(), &frag_table))) + Wsrep_schema_impl::init_stmt(&thd); + if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table))) { WSREP_WARN("Could not open SR table for read: %d", error); - Wsrep_schema_impl::finish_stmt(thd); + Wsrep_schema_impl::finish_stmt(&thd); DBUG_RETURN(1); } @@ -1169,20 +1172,28 @@ int Wsrep_schema::replay_transaction(THD* thd, String buf; frag_table->field[4]->val_str(&buf); - Wsrep_schema_impl::end_index_scan(frag_table); - Wsrep_schema_impl::finish_stmt(thd); - ret= wsrep_apply_events(thd, rli, buf.c_ptr_safe(), buf.length()); - if (ret) { - WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments"); - break; + Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd); + + ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length()); + if (ret) + { + WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments"); + break; + } } - Wsrep_schema_impl::init_stmt(thd); - if ((error= Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))) + Wsrep_schema_impl::end_index_scan(frag_table); + Wsrep_schema_impl::finish_stmt(&thd); + + Wsrep_schema_impl::init_stmt(&thd); + + if ((error= Wsrep_schema_impl::open_for_write(&thd, + sr_table_str.c_str(), + &frag_table))) { WSREP_WARN("Could not open SR table for write: %d", error); - Wsrep_schema_impl::finish_stmt(thd); + Wsrep_schema_impl::finish_stmt(&thd); DBUG_RETURN(1); } error= Wsrep_schema_impl::init_for_index_scan(frag_table, @@ -1206,7 +1217,7 @@ int Wsrep_schema::replay_transaction(THD* thd, break; } Wsrep_schema_impl::end_index_scan(frag_table); - Wsrep_schema_impl::finish_stmt(thd); + Wsrep_schema_impl::finish_stmt(&thd); } DBUG_RETURN(ret); @@ -1215,14 +1226,14 @@ int Wsrep_schema::replay_transaction(THD* thd, int Wsrep_schema::recover_sr_transactions(THD *orig_thd) { DBUG_ENTER("Wsrep_schema::recover_sr_transactions"); - THD storage_thd(true, true); + THD storage_thd(next_thread_id(), true); storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack : (char*) &storage_thd); TABLE* frag_table= 0; TABLE* cluster_table= 0; Wsrep_storage_service storage_service(&storage_thd); Wsrep_schema_impl::binlog_off binlog_off(&storage_thd); - Wsrep_schema_impl::wsrep_off binglog_off(&storage_thd); + Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd); Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &storage_thd); Wsrep_server_state& server_state(Wsrep_server_state::instance()); @@ -1233,13 +1244,9 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) Wsrep_schema_impl::init_stmt(&storage_thd); storage_thd.wsrep_skip_locking= FALSE; - /* - Open the table for reading and writing so that fragments without - valid seqno can be deleted. - */ - if (Wsrep_schema_impl::open_for_write(&storage_thd, - cluster_table_str.c_str(), - &cluster_table) || + if (Wsrep_schema_impl::open_for_read(&storage_thd, + cluster_table_str.c_str(), + &cluster_table) || Wsrep_schema_impl::init_for_scan(cluster_table)) { Wsrep_schema_impl::finish_stmt(&storage_thd); @@ -1273,10 +1280,15 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) storage_thd.wsrep_skip_locking= TRUE; Wsrep_schema_impl::init_stmt(&storage_thd); - if (Wsrep_schema_impl::open_for_read(&storage_thd, sr_table_str.c_str(), &frag_table) || + + /* + Open the table for reading and writing so that fragments without + valid seqno can be deleted. + */ + if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) || Wsrep_schema_impl::init_for_scan(frag_table)) { - WSREP_ERROR("Failed to open SR table for read"); + WSREP_ERROR("Failed to open SR table for write"); goto out; } @@ -1309,7 +1321,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) String data_str; (void)frag_table->field[4]->val_str(&data_str); - wsrep::const_buffer data(data_str.c_ptr(), data_str.length()); + wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length()); wsrep::ws_meta ws_meta(gtid, wsrep::stid(server_id, transaction_id, @@ -1319,14 +1331,13 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) wsrep::high_priority_service* applier; if (!(applier= server_state.find_streaming_applier(server_id, - transaction_id))) + transaction_id))) { DBUG_ASSERT(wsrep::starts_transaction(flags)); - THD* thd= new THD(true, true); + THD* thd= new THD(next_thread_id(), true); thd->thread_stack= (char*)&storage_thd; mysql_mutex_lock(&LOCK_thread_count); - thd->thread_id= next_thread_id(); thd->real_id= pthread_self(); mysql_mutex_unlock(&LOCK_thread_count); |