summaryrefslogtreecommitdiff
path: root/sql/wsrep_schema.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_schema.cc')
-rw-r--r--sql/wsrep_schema.cc79
1 files changed, 45 insertions, 34 deletions
diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc
index 98f17e41c94..57b116e899c 100644
--- a/sql/wsrep_schema.cc
+++ b/sql/wsrep_schema.cc
@@ -1,4 +1,4 @@
-/* Copyright (C) 2015-2017 Codership Oy <info@codership.com>
+/* Copyright (C) 2015-2019 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -584,8 +584,6 @@ static void wsrep_init_thd_for_schema(THD *thd)
thd->real_id=pthread_self(); // Keep purify happy
- WSREP_DEBUG("Wsrep_thd_pool: creating system thread: %lld",
- (long long)thd->thread_id);
thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
(void) mysql_mutex_unlock(&LOCK_thread_count);
@@ -1115,7 +1113,7 @@ int Wsrep_schema::remove_fragments(THD* thd,
DBUG_RETURN(ret);
}
-int Wsrep_schema::replay_transaction(THD* thd,
+int Wsrep_schema::replay_transaction(THD* orig_thd,
Relay_log_info* rli,
const wsrep::ws_meta& ws_meta,
const std::vector<wsrep::seqno>& fragments)
@@ -1123,8 +1121,13 @@ int Wsrep_schema::replay_transaction(THD* thd,
DBUG_ENTER("Wsrep_schema::replay_transaction");
DBUG_ASSERT(!fragments.empty());
- Wsrep_schema_impl::wsrep_off wsrep_off(thd);
- Wsrep_schema_impl::binlog_off binlog_off(thd);
+ THD thd(next_thread_id(), true);
+ thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
+ (char*) &thd);
+
+ Wsrep_schema_impl::wsrep_off wsrep_off(&thd);
+ Wsrep_schema_impl::binlog_off binlog_off(&thd);
+ Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd);
int ret= 1;
int error;
@@ -1135,11 +1138,11 @@ int Wsrep_schema::replay_transaction(THD* thd,
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
i != fragments.end(); ++i)
{
- Wsrep_schema_impl::init_stmt(thd);
- if ((error= Wsrep_schema_impl::open_for_read(thd, sr_table_str.c_str(), &frag_table)))
+ Wsrep_schema_impl::init_stmt(&thd);
+ if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table)))
{
WSREP_WARN("Could not open SR table for read: %d", error);
- Wsrep_schema_impl::finish_stmt(thd);
+ Wsrep_schema_impl::finish_stmt(&thd);
DBUG_RETURN(1);
}
@@ -1169,20 +1172,28 @@ int Wsrep_schema::replay_transaction(THD* thd,
String buf;
frag_table->field[4]->val_str(&buf);
- Wsrep_schema_impl::end_index_scan(frag_table);
- Wsrep_schema_impl::finish_stmt(thd);
- ret= wsrep_apply_events(thd, rli, buf.c_ptr_safe(), buf.length());
- if (ret)
{
- WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
- break;
+ Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd);
+
+ ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length());
+ if (ret)
+ {
+ WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
+ break;
+ }
}
- Wsrep_schema_impl::init_stmt(thd);
- if ((error= Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table)))
+ Wsrep_schema_impl::end_index_scan(frag_table);
+ Wsrep_schema_impl::finish_stmt(&thd);
+
+ Wsrep_schema_impl::init_stmt(&thd);
+
+ if ((error= Wsrep_schema_impl::open_for_write(&thd,
+ sr_table_str.c_str(),
+ &frag_table)))
{
WSREP_WARN("Could not open SR table for write: %d", error);
- Wsrep_schema_impl::finish_stmt(thd);
+ Wsrep_schema_impl::finish_stmt(&thd);
DBUG_RETURN(1);
}
error= Wsrep_schema_impl::init_for_index_scan(frag_table,
@@ -1206,7 +1217,7 @@ int Wsrep_schema::replay_transaction(THD* thd,
break;
}
Wsrep_schema_impl::end_index_scan(frag_table);
- Wsrep_schema_impl::finish_stmt(thd);
+ Wsrep_schema_impl::finish_stmt(&thd);
}
DBUG_RETURN(ret);
@@ -1215,14 +1226,14 @@ int Wsrep_schema::replay_transaction(THD* thd,
int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
{
DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
- THD storage_thd(true, true);
+ THD storage_thd(next_thread_id(), true);
storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &storage_thd);
TABLE* frag_table= 0;
TABLE* cluster_table= 0;
Wsrep_storage_service storage_service(&storage_thd);
Wsrep_schema_impl::binlog_off binlog_off(&storage_thd);
- Wsrep_schema_impl::wsrep_off binglog_off(&storage_thd);
+ Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd);
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd,
&storage_thd);
Wsrep_server_state& server_state(Wsrep_server_state::instance());
@@ -1233,13 +1244,9 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
Wsrep_schema_impl::init_stmt(&storage_thd);
storage_thd.wsrep_skip_locking= FALSE;
- /*
- Open the table for reading and writing so that fragments without
- valid seqno can be deleted.
- */
- if (Wsrep_schema_impl::open_for_write(&storage_thd,
- cluster_table_str.c_str(),
- &cluster_table) ||
+ if (Wsrep_schema_impl::open_for_read(&storage_thd,
+ cluster_table_str.c_str(),
+ &cluster_table) ||
Wsrep_schema_impl::init_for_scan(cluster_table))
{
Wsrep_schema_impl::finish_stmt(&storage_thd);
@@ -1273,10 +1280,15 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
storage_thd.wsrep_skip_locking= TRUE;
Wsrep_schema_impl::init_stmt(&storage_thd);
- if (Wsrep_schema_impl::open_for_read(&storage_thd, sr_table_str.c_str(), &frag_table) ||
+
+ /*
+ Open the table for reading and writing so that fragments without
+ valid seqno can be deleted.
+ */
+ if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) ||
Wsrep_schema_impl::init_for_scan(frag_table))
{
- WSREP_ERROR("Failed to open SR table for read");
+ WSREP_ERROR("Failed to open SR table for write");
goto out;
}
@@ -1309,7 +1321,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
String data_str;
(void)frag_table->field[4]->val_str(&data_str);
- wsrep::const_buffer data(data_str.c_ptr(), data_str.length());
+ wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length());
wsrep::ws_meta ws_meta(gtid,
wsrep::stid(server_id,
transaction_id,
@@ -1319,14 +1331,13 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
wsrep::high_priority_service* applier;
if (!(applier= server_state.find_streaming_applier(server_id,
- transaction_id)))
+ transaction_id)))
{
DBUG_ASSERT(wsrep::starts_transaction(flags));
- THD* thd= new THD(true, true);
+ THD* thd= new THD(next_thread_id(), true);
thd->thread_stack= (char*)&storage_thd;
mysql_mutex_lock(&LOCK_thread_count);
- thd->thread_id= next_thread_id();
thd->real_id= pthread_self();
mysql_mutex_unlock(&LOCK_thread_count);