summaryrefslogtreecommitdiff
path: root/sql/wsrep_thd.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r--sql/wsrep_thd.cc346
1 files changed, 210 insertions, 136 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index addf98561de..50f0376f674 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -31,8 +31,9 @@
#include "rpl_rli.h"
#include "rpl_mi.h"
+extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
+
static Wsrep_thd_queue* wsrep_rollback_queue= 0;
-static Wsrep_thd_queue* wsrep_post_rollback_queue= 0;
static Atomic_counter<uint64_t> wsrep_bf_aborts_counter;
@@ -149,6 +150,122 @@ void wsrep_create_appliers(long threads)
}
}
+static void wsrep_rollback_streaming_aborted_by_toi(THD *thd)
+{
+ WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi");
+ /* Set thd->event_scheduler.data temporarily to NULL to avoid
+ callbacks to threadpool wait_begin() during rollback. */
+ auto saved_esd= thd->event_scheduler.data;
+ thd->event_scheduler.data= 0;
+ if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
+ {
+ DBUG_ASSERT(!saved_esd);
+ DBUG_ASSERT(thd->wsrep_applier_service);
+ thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
+ wsrep::ws_meta());
+ thd->wsrep_applier_service->after_apply();
+ /* Will free THD */
+ Wsrep_server_state::instance().server_service().
+ release_high_priority_service(thd->wsrep_applier_service);
+ }
+ else
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ /* prepare THD for rollback processing */
+ thd->reset_for_next_command(true);
+ thd->lex->sql_command= SQLCOM_ROLLBACK;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ /* Perform a client rollback, restore globals and signal
+ the victim only when all the resources have been
+ released */
+ thd->wsrep_cs().client_service().bf_rollback();
+ wsrep_reset_threadvars(thd);
+ /* Assign saved event_scheduler.data back before letting
+ client to continue. */
+ thd->event_scheduler.data= saved_esd;
+ thd->wsrep_cs().sync_rollback_complete();
+ }
+}
+
+static void wsrep_rollback_high_priority(THD *thd)
+{
+ WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)",
+ thd->thread_id, (long long)thd->real_id);
+ DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
+ /* Must be streaming and must have been removed from the
+ server state streaming appliers map. */
+ DBUG_ASSERT(thd->wsrep_trx().is_streaming());
+ DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
+ thd->wsrep_trx().server_id(),
+ thd->wsrep_trx().id()));
+ DBUG_ASSERT(thd->wsrep_applier_service);
+
+ /* Fragment removal should happen before rollback to make
+ the transaction non-observable in SR table after the rollback
+ completes. For correctness the order does not matter here,
+ but currently it is mandated by checks in some MTR tests. */
+ wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ Wsrep_storage_service* storage_service=
+ static_cast<Wsrep_storage_service*>(
+ Wsrep_server_state::instance().server_service().storage_service(
+ *thd->wsrep_applier_service));
+ storage_service->store_globals();
+ storage_service->adopt_transaction(thd->wsrep_trx());
+ storage_service->remove_fragments();
+ storage_service->commit(wsrep::ws_handle(transaction_id, 0),
+ wsrep::ws_meta());
+ Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
+ wsrep_store_threadvars(thd);
+ thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
+ wsrep::ws_meta());
+ thd->wsrep_applier_service->after_apply();
+ /* Will free THD */
+ Wsrep_server_state::instance().server_service()
+ .release_high_priority_service(thd->wsrep_applier_service);
+}
+
+static void wsrep_rollback_local(THD *thd)
+{
+ WSREP_INFO("Wsrep_rollback_local");
+ if (thd->wsrep_trx().is_streaming())
+ {
+ wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ Wsrep_storage_service* storage_service=
+ static_cast<Wsrep_storage_service*>(
+ Wsrep_server_state::instance().server_service().
+ storage_service(thd->wsrep_cs().client_service()));
+
+ storage_service->store_globals();
+ storage_service->adopt_transaction(thd->wsrep_trx());
+ storage_service->remove_fragments();
+ storage_service->commit(wsrep::ws_handle(transaction_id, 0),
+ wsrep::ws_meta());
+ Wsrep_server_state::instance().server_service().
+ release_storage_service(storage_service);
+ wsrep_store_threadvars(thd);
+ }
+ /* Set thd->event_scheduler.data temporarily to NULL to avoid
+ callbacks to threadpool wait_begin() during rollback. */
+ auto saved_esd= thd->event_scheduler.data;
+ thd->event_scheduler.data= 0;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ /* prepare THD for rollback processing */
+ thd->reset_for_next_command();
+ thd->lex->sql_command= SQLCOM_ROLLBACK;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ /* Perform a client rollback, restore globals and signal
+ the victim only when all the resources have been
+ released */
+ thd->wsrep_cs().client_service().bf_rollback();
+ wsrep_reset_threadvars(thd);
+ /* Assign saved event_scheduler.data back before letting
+ client to continue. */
+ thd->event_scheduler.data= saved_esd;
+ thd->wsrep_cs().sync_rollback_complete();
+ WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
+ thd->thread_id, (long long)thd->real_id);
+}
+
static void wsrep_rollback_process(THD *rollbacker,
void *arg __attribute__((unused)))
{
@@ -170,119 +287,36 @@ static void wsrep_rollback_process(THD *rollbacker,
WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d",
(long long)thd->real_id,
tx.state());
-
mysql_mutex_unlock(&thd->LOCK_thd_data);
continue;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
+ wsrep_reset_threadvars(rollbacker);
+ wsrep_store_threadvars(thd);
+ thd->wsrep_cs().acquire_ownership();
+
thd_proc_info(rollbacker, "wsrep aborter active");
- wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ /* Rollback methods below may free thd pointer. Do not try
+ to access it after method returns. */
if (thd->wsrep_trx().is_streaming() &&
thd->wsrep_trx().bf_aborted_in_total_order())
{
- thd->store_globals();
- thd->wsrep_cs().store_globals();
- if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
- {
- DBUG_ASSERT(thd->wsrep_applier_service);
- thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
- wsrep::ws_meta());
- thd->wsrep_applier_service->after_apply();
- /* Will free THD */
- Wsrep_server_state::instance().server_service().
- release_high_priority_service(thd->wsrep_applier_service);
- }
- else
- {
- mysql_mutex_lock(&thd->LOCK_thd_data);
- /* prepare THD for rollback processing */
- thd->reset_for_next_command(true);
- thd->lex->sql_command= SQLCOM_ROLLBACK;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- /* Perform a client rollback, restore globals and signal
- the victim only when all the resources have been
- released */
- thd->wsrep_cs().client_service().bf_rollback();
- thd->reset_globals();
- thd->wsrep_cs().sync_rollback_complete();
- }
+ wsrep_rollback_streaming_aborted_by_toi(thd);
}
else if (wsrep_thd_is_applying(thd))
{
- WSREP_DEBUG("rollbacker aborting SR thd: (%lld %llu)",
- thd->thread_id, (long long)thd->real_id);
- DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
- /* Must be streaming and must have been removed from the
- server state streaming appliers map. */
- DBUG_ASSERT(thd->wsrep_trx().is_streaming());
- DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
- thd->wsrep_trx().server_id(),
- thd->wsrep_trx().id()));
- DBUG_ASSERT(thd->wsrep_applier_service);
-
- /* Fragment removal should happen before rollback to make
- the transaction non-observable in SR table after the rollback
- completes. For correctness the order does not matter here,
- but currently it is mandated by checks in some MTR tests. */
- Wsrep_storage_service* storage_service=
- static_cast<Wsrep_storage_service*>(
- Wsrep_server_state::instance().server_service().storage_service(
- *thd->wsrep_applier_service));
- storage_service->store_globals();
- storage_service->adopt_transaction(thd->wsrep_trx());
- storage_service->remove_fragments();
- storage_service->commit(wsrep::ws_handle(transaction_id, 0),
- wsrep::ws_meta());
- Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
- thd->store_globals();
- thd->wsrep_cs().store_globals();
- thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
- wsrep::ws_meta());
- thd->wsrep_applier_service->after_apply();
- /* Will free THD */
- Wsrep_server_state::instance().server_service()
- .release_high_priority_service(thd->wsrep_applier_service);
-
+ wsrep_rollback_high_priority(thd);
}
else
{
- if (thd->wsrep_trx().is_streaming())
- {
- Wsrep_storage_service* storage_service=
- static_cast<Wsrep_storage_service*>(
- Wsrep_server_state::instance().server_service().
- storage_service(thd->wsrep_cs().client_service()));
-
- storage_service->store_globals();
- storage_service->adopt_transaction(thd->wsrep_trx());
- storage_service->remove_fragments();
- storage_service->commit(wsrep::ws_handle(transaction_id, 0),
- wsrep::ws_meta());
- Wsrep_server_state::instance().server_service().
- release_storage_service(storage_service);
- }
- thd->store_globals();
- thd->wsrep_cs().store_globals();
- mysql_mutex_lock(&thd->LOCK_thd_data);
- /* prepare THD for rollback processing */
- thd->reset_for_next_command();
- thd->lex->sql_command= SQLCOM_ROLLBACK;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- /* Perform a client rollback, restore globals and signal
- the victim only when all the resources have been
- released */
- thd->wsrep_cs().client_service().bf_rollback();
- thd->reset_globals();
- thd->wsrep_cs().sync_rollback_complete();
- WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
- thd->thread_id, (long long)thd->real_id);
+ wsrep_rollback_local(thd);
}
-
+ wsrep_store_threadvars(rollbacker);
thd_proc_info(rollbacker, "wsrep aborter idle");
}
-
+
delete wsrep_rollback_queue;
wsrep_rollback_queue= NULL;
@@ -293,39 +327,6 @@ static void wsrep_rollback_process(THD *rollbacker,
DBUG_VOID_RETURN;
}
-static void wsrep_post_rollback_process(THD *post_rollbacker,
- void *arg __attribute__((unused)))
-{
- DBUG_ENTER("wsrep_post_rollback_process");
- THD* thd= NULL;
-
- WSREP_INFO("Starting post rollbacker thread %llu", post_rollbacker->thread_id);
- DBUG_ASSERT(!wsrep_post_rollback_queue);
- wsrep_post_rollback_queue= new Wsrep_thd_queue(post_rollbacker);
-
- while ((thd= wsrep_post_rollback_queue->pop_front()) != NULL)
- {
- thd->store_globals();
- wsrep::client_state& cs(thd->wsrep_cs());
- mysql_mutex_lock(&thd->LOCK_thd_data);
- DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
- WSREP_DEBUG("post rollbacker calling post rollback for thd %llu, conf %s",
- thd->thread_id, wsrep_thd_transaction_state_str(thd));
-
- cs.after_rollback();
- DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborted);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- }
-
- delete wsrep_post_rollback_queue;
- wsrep_post_rollback_queue= NULL;
-
- DBUG_ASSERT(post_rollbacker->killed != NOT_KILLED);
- DBUG_PRINT("wsrep",("wsrep post rollbacker thread exiting"));
- WSREP_INFO("post rollbacker thread exiting %llu", post_rollbacker->thread_id);
- DBUG_VOID_RETURN;
-}
-
void wsrep_create_rollbacker()
{
if (wsrep_cluster_address && wsrep_cluster_address[0] != 0)
@@ -337,14 +338,6 @@ void wsrep_create_rollbacker()
/* create rollbacker */
if (create_wsrep_THD(args))
WSREP_WARN("Can't create thread to manage wsrep rollback");
-
- /* create post_rollbacker */
- args= new Wsrep_thd_args(wsrep_post_rollback_process,
- WSREP_ROLLBACKER_THREAD,
- pthread_self());
-
- if (create_wsrep_THD(args))
- WSREP_WARN("Can't create thread to manage wsrep post rollback");
}
}
@@ -438,3 +431,84 @@ void wsrep_thd_auto_increment_variables(THD* thd,
*offset= thd->variables.auto_increment_offset;
*increment= thd->variables.auto_increment_increment;
}
+
+int wsrep_create_threadvars()
+{
+ int ret= 0;
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ /* Caller should have called wsrep_reset_threadvars() before this
+ method. */
+ DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys));
+ pthread_setspecific(THR_KEY_mysys, 0);
+ ret= my_thread_init();
+ }
+ return ret;
+}
+
+void wsrep_delete_threadvars()
+{
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ /* The caller should have called wsrep_store_threadvars() before
+ this method. */
+ DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys));
+ /* Reset psi state to avoid deallocating applier thread
+ psi_thread. */
+ PSI_thread *psi_thread= PSI_CALL_get_thread();
+#ifdef HAVE_PSI_INTERFACE
+ if (PSI_server)
+ {
+ PSI_server->set_thread(0);
+ }
+#endif /* HAVE_PSI_INTERFACE */
+ my_thread_end();
+ PSI_CALL_set_thread(psi_thread);
+ pthread_setspecific(THR_KEY_mysys, 0);
+ }
+}
+
+void wsrep_assign_from_threadvars(THD *thd)
+{
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
+ DBUG_ASSERT(mysys_var);
+ thd->set_mysys_var(mysys_var);
+ }
+}
+
+Wsrep_threadvars wsrep_save_threadvars()
+{
+ return Wsrep_threadvars{
+ current_thd,
+ (st_my_thread_var*) pthread_getspecific(THR_KEY_mysys)
+ };
+}
+
+void wsrep_restore_threadvars(const Wsrep_threadvars& globals)
+{
+ set_current_thd(globals.cur_thd);
+ pthread_setspecific(THR_KEY_mysys, globals.mysys_var);
+}
+
+int wsrep_store_threadvars(THD *thd)
+{
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ pthread_setspecific(THR_KEY_mysys, thd->mysys_var);
+ }
+ return thd->store_globals();
+}
+
+void wsrep_reset_threadvars(THD *thd)
+{
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ pthread_setspecific(THR_KEY_mysys, 0);
+ }
+ else
+ {
+ thd->reset_globals();
+ }
+}