diff options
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r-- | sql/wsrep_thd.cc | 346 |
1 files changed, 210 insertions, 136 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index addf98561de..50f0376f674 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -31,8 +31,9 @@ #include "rpl_rli.h" #include "rpl_mi.h" +extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); + static Wsrep_thd_queue* wsrep_rollback_queue= 0; -static Wsrep_thd_queue* wsrep_post_rollback_queue= 0; static Atomic_counter<uint64_t> wsrep_bf_aborts_counter; @@ -149,6 +150,122 @@ void wsrep_create_appliers(long threads) } } +static void wsrep_rollback_streaming_aborted_by_toi(THD *thd) +{ + WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi"); + /* Set thd->event_scheduler.data temporarily to NULL to avoid + callbacks to threadpool wait_begin() during rollback. */ + auto saved_esd= thd->event_scheduler.data; + thd->event_scheduler.data= 0; + if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority) + { + DBUG_ASSERT(!saved_esd); + DBUG_ASSERT(thd->wsrep_applier_service); + thd->wsrep_applier_service->rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + thd->wsrep_applier_service->after_apply(); + /* Will free THD */ + Wsrep_server_state::instance().server_service(). + release_high_priority_service(thd->wsrep_applier_service); + } + else + { + mysql_mutex_lock(&thd->LOCK_thd_data); + /* prepare THD for rollback processing */ + thd->reset_for_next_command(true); + thd->lex->sql_command= SQLCOM_ROLLBACK; + mysql_mutex_unlock(&thd->LOCK_thd_data); + /* Perform a client rollback, restore globals and signal + the victim only when all the resources have been + released */ + thd->wsrep_cs().client_service().bf_rollback(); + wsrep_reset_threadvars(thd); + /* Assign saved event_scheduler.data back before letting + client to continue. */ + thd->event_scheduler.data= saved_esd; + thd->wsrep_cs().sync_rollback_complete(); + } +} + +static void wsrep_rollback_high_priority(THD *thd) +{ + WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)", + thd->thread_id, (long long)thd->real_id); + DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority); + /* Must be streaming and must have been removed from the + server state streaming appliers map. */ + DBUG_ASSERT(thd->wsrep_trx().is_streaming()); + DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier( + thd->wsrep_trx().server_id(), + thd->wsrep_trx().id())); + DBUG_ASSERT(thd->wsrep_applier_service); + + /* Fragment removal should happen before rollback to make + the transaction non-observable in SR table after the rollback + completes. For correctness the order does not matter here, + but currently it is mandated by checks in some MTR tests. */ + wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + Wsrep_storage_service* storage_service= + static_cast<Wsrep_storage_service*>( + Wsrep_server_state::instance().server_service().storage_service( + *thd->wsrep_applier_service)); + storage_service->store_globals(); + storage_service->adopt_transaction(thd->wsrep_trx()); + storage_service->remove_fragments(); + storage_service->commit(wsrep::ws_handle(transaction_id, 0), + wsrep::ws_meta()); + Wsrep_server_state::instance().server_service().release_storage_service(storage_service); + wsrep_store_threadvars(thd); + thd->wsrep_applier_service->rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + thd->wsrep_applier_service->after_apply(); + /* Will free THD */ + Wsrep_server_state::instance().server_service() + .release_high_priority_service(thd->wsrep_applier_service); +} + +static void wsrep_rollback_local(THD *thd) +{ + WSREP_INFO("Wsrep_rollback_local"); + if (thd->wsrep_trx().is_streaming()) + { + wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + Wsrep_storage_service* storage_service= + static_cast<Wsrep_storage_service*>( + Wsrep_server_state::instance().server_service(). + storage_service(thd->wsrep_cs().client_service())); + + storage_service->store_globals(); + storage_service->adopt_transaction(thd->wsrep_trx()); + storage_service->remove_fragments(); + storage_service->commit(wsrep::ws_handle(transaction_id, 0), + wsrep::ws_meta()); + Wsrep_server_state::instance().server_service(). + release_storage_service(storage_service); + wsrep_store_threadvars(thd); + } + /* Set thd->event_scheduler.data temporarily to NULL to avoid + callbacks to threadpool wait_begin() during rollback. */ + auto saved_esd= thd->event_scheduler.data; + thd->event_scheduler.data= 0; + mysql_mutex_lock(&thd->LOCK_thd_data); + /* prepare THD for rollback processing */ + thd->reset_for_next_command(); + thd->lex->sql_command= SQLCOM_ROLLBACK; + mysql_mutex_unlock(&thd->LOCK_thd_data); + /* Perform a client rollback, restore globals and signal + the victim only when all the resources have been + released */ + thd->wsrep_cs().client_service().bf_rollback(); + wsrep_reset_threadvars(thd); + /* Assign saved event_scheduler.data back before letting + client to continue. */ + thd->event_scheduler.data= saved_esd; + thd->wsrep_cs().sync_rollback_complete(); + WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)", + thd->thread_id, (long long)thd->real_id); +} + static void wsrep_rollback_process(THD *rollbacker, void *arg __attribute__((unused))) { @@ -170,119 +287,36 @@ static void wsrep_rollback_process(THD *rollbacker, WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d", (long long)thd->real_id, tx.state()); - mysql_mutex_unlock(&thd->LOCK_thd_data); continue; } mysql_mutex_unlock(&thd->LOCK_thd_data); + wsrep_reset_threadvars(rollbacker); + wsrep_store_threadvars(thd); + thd->wsrep_cs().acquire_ownership(); + thd_proc_info(rollbacker, "wsrep aborter active"); - wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + /* Rollback methods below may free thd pointer. Do not try + to access it after method returns. */ if (thd->wsrep_trx().is_streaming() && thd->wsrep_trx().bf_aborted_in_total_order()) { - thd->store_globals(); - thd->wsrep_cs().store_globals(); - if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority) - { - DBUG_ASSERT(thd->wsrep_applier_service); - thd->wsrep_applier_service->rollback(wsrep::ws_handle(), - wsrep::ws_meta()); - thd->wsrep_applier_service->after_apply(); - /* Will free THD */ - Wsrep_server_state::instance().server_service(). - release_high_priority_service(thd->wsrep_applier_service); - } - else - { - mysql_mutex_lock(&thd->LOCK_thd_data); - /* prepare THD for rollback processing */ - thd->reset_for_next_command(true); - thd->lex->sql_command= SQLCOM_ROLLBACK; - mysql_mutex_unlock(&thd->LOCK_thd_data); - /* Perform a client rollback, restore globals and signal - the victim only when all the resources have been - released */ - thd->wsrep_cs().client_service().bf_rollback(); - thd->reset_globals(); - thd->wsrep_cs().sync_rollback_complete(); - } + wsrep_rollback_streaming_aborted_by_toi(thd); } else if (wsrep_thd_is_applying(thd)) { - WSREP_DEBUG("rollbacker aborting SR thd: (%lld %llu)", - thd->thread_id, (long long)thd->real_id); - DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority); - /* Must be streaming and must have been removed from the - server state streaming appliers map. */ - DBUG_ASSERT(thd->wsrep_trx().is_streaming()); - DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier( - thd->wsrep_trx().server_id(), - thd->wsrep_trx().id())); - DBUG_ASSERT(thd->wsrep_applier_service); - - /* Fragment removal should happen before rollback to make - the transaction non-observable in SR table after the rollback - completes. For correctness the order does not matter here, - but currently it is mandated by checks in some MTR tests. */ - Wsrep_storage_service* storage_service= - static_cast<Wsrep_storage_service*>( - Wsrep_server_state::instance().server_service().storage_service( - *thd->wsrep_applier_service)); - storage_service->store_globals(); - storage_service->adopt_transaction(thd->wsrep_trx()); - storage_service->remove_fragments(); - storage_service->commit(wsrep::ws_handle(transaction_id, 0), - wsrep::ws_meta()); - Wsrep_server_state::instance().server_service().release_storage_service(storage_service); - thd->store_globals(); - thd->wsrep_cs().store_globals(); - thd->wsrep_applier_service->rollback(wsrep::ws_handle(), - wsrep::ws_meta()); - thd->wsrep_applier_service->after_apply(); - /* Will free THD */ - Wsrep_server_state::instance().server_service() - .release_high_priority_service(thd->wsrep_applier_service); - + wsrep_rollback_high_priority(thd); } else { - if (thd->wsrep_trx().is_streaming()) - { - Wsrep_storage_service* storage_service= - static_cast<Wsrep_storage_service*>( - Wsrep_server_state::instance().server_service(). - storage_service(thd->wsrep_cs().client_service())); - - storage_service->store_globals(); - storage_service->adopt_transaction(thd->wsrep_trx()); - storage_service->remove_fragments(); - storage_service->commit(wsrep::ws_handle(transaction_id, 0), - wsrep::ws_meta()); - Wsrep_server_state::instance().server_service(). - release_storage_service(storage_service); - } - thd->store_globals(); - thd->wsrep_cs().store_globals(); - mysql_mutex_lock(&thd->LOCK_thd_data); - /* prepare THD for rollback processing */ - thd->reset_for_next_command(); - thd->lex->sql_command= SQLCOM_ROLLBACK; - mysql_mutex_unlock(&thd->LOCK_thd_data); - /* Perform a client rollback, restore globals and signal - the victim only when all the resources have been - released */ - thd->wsrep_cs().client_service().bf_rollback(); - thd->reset_globals(); - thd->wsrep_cs().sync_rollback_complete(); - WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)", - thd->thread_id, (long long)thd->real_id); + wsrep_rollback_local(thd); } - + wsrep_store_threadvars(rollbacker); thd_proc_info(rollbacker, "wsrep aborter idle"); } - + delete wsrep_rollback_queue; wsrep_rollback_queue= NULL; @@ -293,39 +327,6 @@ static void wsrep_rollback_process(THD *rollbacker, DBUG_VOID_RETURN; } -static void wsrep_post_rollback_process(THD *post_rollbacker, - void *arg __attribute__((unused))) -{ - DBUG_ENTER("wsrep_post_rollback_process"); - THD* thd= NULL; - - WSREP_INFO("Starting post rollbacker thread %llu", post_rollbacker->thread_id); - DBUG_ASSERT(!wsrep_post_rollback_queue); - wsrep_post_rollback_queue= new Wsrep_thd_queue(post_rollbacker); - - while ((thd= wsrep_post_rollback_queue->pop_front()) != NULL) - { - thd->store_globals(); - wsrep::client_state& cs(thd->wsrep_cs()); - mysql_mutex_lock(&thd->LOCK_thd_data); - DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting); - WSREP_DEBUG("post rollbacker calling post rollback for thd %llu, conf %s", - thd->thread_id, wsrep_thd_transaction_state_str(thd)); - - cs.after_rollback(); - DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborted); - mysql_mutex_unlock(&thd->LOCK_thd_data); - } - - delete wsrep_post_rollback_queue; - wsrep_post_rollback_queue= NULL; - - DBUG_ASSERT(post_rollbacker->killed != NOT_KILLED); - DBUG_PRINT("wsrep",("wsrep post rollbacker thread exiting")); - WSREP_INFO("post rollbacker thread exiting %llu", post_rollbacker->thread_id); - DBUG_VOID_RETURN; -} - void wsrep_create_rollbacker() { if (wsrep_cluster_address && wsrep_cluster_address[0] != 0) @@ -337,14 +338,6 @@ void wsrep_create_rollbacker() /* create rollbacker */ if (create_wsrep_THD(args)) WSREP_WARN("Can't create thread to manage wsrep rollback"); - - /* create post_rollbacker */ - args= new Wsrep_thd_args(wsrep_post_rollback_process, - WSREP_ROLLBACKER_THREAD, - pthread_self()); - - if (create_wsrep_THD(args)) - WSREP_WARN("Can't create thread to manage wsrep post rollback"); } } @@ -438,3 +431,84 @@ void wsrep_thd_auto_increment_variables(THD* thd, *offset= thd->variables.auto_increment_offset; *increment= thd->variables.auto_increment_increment; } + +int wsrep_create_threadvars() +{ + int ret= 0; + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + /* Caller should have called wsrep_reset_threadvars() before this + method. */ + DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys)); + pthread_setspecific(THR_KEY_mysys, 0); + ret= my_thread_init(); + } + return ret; +} + +void wsrep_delete_threadvars() +{ + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + /* The caller should have called wsrep_store_threadvars() before + this method. */ + DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys)); + /* Reset psi state to avoid deallocating applier thread + psi_thread. */ + PSI_thread *psi_thread= PSI_CALL_get_thread(); +#ifdef HAVE_PSI_INTERFACE + if (PSI_server) + { + PSI_server->set_thread(0); + } +#endif /* HAVE_PSI_INTERFACE */ + my_thread_end(); + PSI_CALL_set_thread(psi_thread); + pthread_setspecific(THR_KEY_mysys, 0); + } +} + +void wsrep_assign_from_threadvars(THD *thd) +{ + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); + DBUG_ASSERT(mysys_var); + thd->set_mysys_var(mysys_var); + } +} + +Wsrep_threadvars wsrep_save_threadvars() +{ + return Wsrep_threadvars{ + current_thd, + (st_my_thread_var*) pthread_getspecific(THR_KEY_mysys) + }; +} + +void wsrep_restore_threadvars(const Wsrep_threadvars& globals) +{ + set_current_thd(globals.cur_thd); + pthread_setspecific(THR_KEY_mysys, globals.mysys_var); +} + +int wsrep_store_threadvars(THD *thd) +{ + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + pthread_setspecific(THR_KEY_mysys, thd->mysys_var); + } + return thd->store_globals(); +} + +void wsrep_reset_threadvars(THD *thd) +{ + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + pthread_setspecific(THR_KEY_mysys, 0); + } + else + { + thd->reset_globals(); + } +} |