diff options
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r-- | sql/wsrep_thd.cc | 787 |
1 files changed, 252 insertions, 535 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index ce6d9688cb3..4f9915fa05f 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -15,412 +15,82 @@ #include "mariadb.h" #include "wsrep_thd.h" +#include "wsrep_trans_observer.h" +#include "wsrep_high_priority_service.h" +#include "wsrep_storage_service.h" #include "transaction.h" #include "rpl_rli.h" #include "log_event.h" #include "sql_parse.h" -//#include "global_threads.h" // LOCK_thread_count, etc. #include "sql_base.h" // close_thread_tables() #include "mysqld.h" // start_wsrep_THD(); - -#include "slave.h" // opt_log_slave_updates -#include "rpl_filter.h" +#include "wsrep_applier.h" // start_wsrep_THD(); +#include "mysql/service_wsrep.h" +#include "debug_sync.h" +#include "slave.h" #include "rpl_rli.h" #include "rpl_mi.h" -#if (__LP64__) -static volatile int64 wsrep_bf_aborts_counter(0); -#define WSREP_ATOMIC_LOAD_LONG my_atomic_load64 -#define WSREP_ATOMIC_ADD_LONG my_atomic_add64 -#else -static volatile int32 wsrep_bf_aborts_counter(0); -#define WSREP_ATOMIC_LOAD_LONG my_atomic_load32 -#define WSREP_ATOMIC_ADD_LONG my_atomic_add32 -#endif +static Wsrep_thd_queue* wsrep_rollback_queue= 0; +static Wsrep_thd_queue* wsrep_post_rollback_queue= 0; +static Atomic_counter<uint64_t> wsrep_bf_aborts_counter; + int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff, enum enum_var_type scope) { - wsrep_local_bf_aborts = WSREP_ATOMIC_LOAD_LONG(&wsrep_bf_aborts_counter); - var->type = SHOW_LONGLONG; - var->value = (char*)&wsrep_local_bf_aborts; + wsrep_local_bf_aborts= wsrep_bf_aborts_counter; + var->type= SHOW_LONGLONG; + var->value= (char*)&wsrep_local_bf_aborts; return 0; } -/* must have (&thd->LOCK_thd_data) */ -void wsrep_client_rollback(THD *thd) -{ - WSREP_DEBUG("client rollback due to BF abort for (%lld), query: %s", - (longlong) thd->thread_id, thd->query()); - - WSREP_ATOMIC_ADD_LONG(&wsrep_bf_aborts_counter, 1); - - thd->wsrep_conflict_state= ABORTING; - mysql_mutex_unlock(&thd->LOCK_thd_data); - trans_rollback(thd); - - if (thd->locked_tables_mode && thd->lock) - { - WSREP_DEBUG("unlocking tables for BF abort (%lld)", - (longlong) thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); - } - - if (thd->global_read_lock.is_acquired()) - { - WSREP_DEBUG("unlocking GRL for BF abort (%lld)", - (longlong) thd->thread_id); - thd->global_read_lock.unlock_global_read_lock(thd); - } - - /* Release transactional metadata locks. */ - thd->mdl_context.release_transactional_locks(); - - /* release explicit MDL locks */ - thd->mdl_context.release_explicit_locks(); - - if (thd->get_binlog_table_maps()) - { - WSREP_DEBUG("clearing binlog table map for BF abort (%lld)", - (longlong) thd->thread_id); - thd->clear_binlog_table_maps(); - } - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->wsrep_conflict_state= ABORTED; -} - -#define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1 -#define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2 - -static rpl_group_info* wsrep_relay_group_init(const char* log_fname) -{ - Relay_log_info* rli= new Relay_log_info(false); - - if (!rli->relay_log.description_event_for_exec) - { - rli->relay_log.description_event_for_exec= - new Format_description_log_event(4); - } - - static LEX_CSTRING connection_name= { STRING_WITH_LEN("wsrep") }; - - /* - Master_info's constructor initializes rpl_filter by either an already - constructed Rpl_filter object from global 'rpl_filters' list if the - specified connection name is same, or it constructs a new Rpl_filter - object and adds it to rpl_filters. This object is later destructed by - Mater_info's destructor by looking it up based on connection name in - rpl_filters list. - - However, since all Master_info objects created here would share same - connection name ("wsrep"), destruction of any of the existing Master_info - objects (in wsrep_return_from_bf_mode()) would free rpl_filter referenced - by any/all existing Master_info objects. - - In order to avoid that, we have added a check in Master_info's destructor - to not free the "wsrep" rpl_filter. It will eventually be freed by - free_all_rpl_filters() when server terminates. - */ - rli->mi = new Master_info(&connection_name, false); - - struct rpl_group_info *rgi= new rpl_group_info(rli); - rgi->thd= rli->sql_driver_thd= current_thd; - - if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) - { - rgi->deferred_events= new Deferred_log_events(rli); - } - - return rgi; -} - -static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) +static void wsrep_replication_process(THD *thd, + void* arg __attribute__((unused))) { - shadow->options = thd->variables.option_bits; - shadow->server_status = thd->server_status; - shadow->wsrep_exec_mode = thd->wsrep_exec_mode; - shadow->vio = thd->net.vio; - - // Disable general logging on applier threads - thd->variables.option_bits |= OPTION_LOG_OFF; - // Enable binlogging if opt_log_slave_updates is set - if (opt_log_slave_updates) - thd->variables.option_bits|= OPTION_BIN_LOG; - else - thd->variables.option_bits&= ~(OPTION_BIN_LOG); + DBUG_ENTER("wsrep_replication_process"); - if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay"); + Wsrep_applier_service applier_service(thd); /* thd->system_thread_info.rpl_sql_info isn't initialized. */ thd->system_thread_info.rpl_sql_info= new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter); - thd->wsrep_exec_mode= REPL_RECV; - thd->net.vio= 0; - thd->clear_error(); - - shadow->tx_isolation = thd->variables.tx_isolation; - thd->variables.tx_isolation = ISO_READ_COMMITTED; - thd->tx_isolation = ISO_READ_COMMITTED; - - shadow->db = thd->db.str; - shadow->db_length = thd->db.length; - shadow->user_time = thd->user_time; - shadow->row_count_func= thd->get_row_count_func(); - thd->reset_db(&null_clex_str); -} + WSREP_INFO("Starting applier thread %llu", thd->thread_id); + enum wsrep::provider::status + ret= Wsrep_server_state::get_provider().run_applier(&applier_service); -static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) -{ - LEX_CSTRING db= {shadow->db, shadow->db_length }; - thd->variables.option_bits = shadow->options; - thd->server_status = shadow->server_status; - thd->wsrep_exec_mode = shadow->wsrep_exec_mode; - thd->net.vio = shadow->vio; - thd->variables.tx_isolation = shadow->tx_isolation; - thd->user_time = shadow->user_time; - thd->reset_db(&db); + WSREP_INFO("Applier thread exiting %d", ret); + mysql_mutex_lock(&LOCK_thread_count); + wsrep_close_applier(thd); + mysql_cond_broadcast(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); delete thd->system_thread_info.rpl_sql_info; delete thd->wsrep_rgi->rli->mi; delete thd->wsrep_rgi->rli; - + thd->wsrep_rgi->cleanup_after_session(); delete thd->wsrep_rgi; - thd->wsrep_rgi = NULL; - thd->set_row_count_func(shadow->row_count_func); -} - -void wsrep_replay_transaction(THD *thd) -{ - DBUG_ENTER("wsrep_replay_transaction"); - /* checking if BF trx must be replayed */ - if (thd->wsrep_conflict_state== MUST_REPLAY) { - DBUG_ASSERT(wsrep_thd_trx_seqno(thd)); - if (thd->wsrep_exec_mode!= REPL_RECV) { - if (thd->get_stmt_da()->is_sent()) - { - WSREP_ERROR("replay issue, thd has reported status already"); - } - - - /* - PS reprepare observer should have been removed already. - open_table() will fail if we have dangling observer here. - */ - DBUG_ASSERT(thd->m_reprepare_observer == NULL); - - struct da_shadow - { - enum Diagnostics_area::enum_diagnostics_status status; - ulonglong affected_rows; - ulonglong last_insert_id; - char message[MYSQL_ERRMSG_SIZE]; - }; - struct da_shadow da_status; - da_status.status= thd->get_stmt_da()->status(); - if (da_status.status == Diagnostics_area::DA_OK) - { - da_status.affected_rows= thd->get_stmt_da()->affected_rows(); - da_status.last_insert_id= thd->get_stmt_da()->last_insert_id(); - strmake(da_status.message, - thd->get_stmt_da()->message(), - sizeof(da_status.message)-1); - } - - thd->get_stmt_da()->reset_diagnostics_area(); - - thd->wsrep_conflict_state= REPLAYING; - mysql_mutex_unlock(&thd->LOCK_thd_data); + thd->wsrep_rgi= NULL; - thd->reset_for_next_command(); - thd->reset_killed(); - close_thread_tables(thd); - if (thd->locked_tables_mode && thd->lock) - { - WSREP_DEBUG("releasing table lock for replaying (%lld)", - (longlong) thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); - } - thd->mdl_context.release_transactional_locks(); - /* - Replaying will call MYSQL_START_STATEMENT when handling - BEGIN Query_log_event so end statement must be called before - replaying. - */ - MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); - thd->m_statement_psi= NULL; - thd->m_digest= NULL; - thd_proc_info(thd, "WSREP replaying trx"); - WSREP_DEBUG("replay trx: %s %lld", - thd->query() ? thd->query() : "void", - (long long)wsrep_thd_trx_seqno(thd)); - struct wsrep_thd_shadow shadow; - wsrep_prepare_bf_thd(thd, &shadow); - - /* From trans_begin() */ - thd->variables.option_bits|= OPTION_BEGIN; - thd->server_status|= SERVER_STATUS_IN_TRANS; - - int rcode = wsrep->replay_trx(wsrep, - &thd->wsrep_ws_handle, - (void *)thd); - - wsrep_return_from_bf_mode(thd, &shadow); - if (thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); - - mysql_mutex_lock(&thd->LOCK_thd_data); - - switch (rcode) - { - case WSREP_OK: - thd->wsrep_conflict_state= NO_CONFLICT; - wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); - WSREP_DEBUG("trx_replay successful for: %lld %lld", - (longlong) thd->thread_id, (longlong) thd->real_id); - if (thd->get_stmt_da()->is_sent()) - { - WSREP_WARN("replay ok, thd has reported status"); - } - else if (thd->get_stmt_da()->is_set()) - { - if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK && - thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK) - { - WSREP_WARN("replay ok, thd has error status %d", - thd->get_stmt_da()->status()); - } - } - else - { - if (da_status.status == Diagnostics_area::DA_OK) - { - my_ok(thd, - da_status.affected_rows, - da_status.last_insert_id, - da_status.message); - } - else - { - my_ok(thd); - } - } - break; - case WSREP_TRX_FAIL: - if (thd->get_stmt_da()->is_sent()) - { - WSREP_ERROR("replay failed, thd has reported status"); - } - else - { - WSREP_DEBUG("replay failed, rolling back"); - } - thd->wsrep_conflict_state= ABORTED; - wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle); - break; - default: - WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s", - rcode, thd->get_db(), - thd->query() ? thd->query() : "void"); - /* we're now in inconsistent state, must abort */ - - /* http://bazaar.launchpad.net/~codership/codership-mysql/5.6/revision/3962#sql/wsrep_thd.cc */ - mysql_mutex_unlock(&thd->LOCK_thd_data); - - unireg_abort(1); - break; - } - - wsrep_cleanup_transaction(thd); - - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - WSREP_DEBUG("replaying decreased: %d, thd: %lld", - wsrep_replaying, (longlong) thd->thread_id); - mysql_cond_broadcast(&COND_wsrep_replaying); - mysql_mutex_unlock(&LOCK_wsrep_replaying); - } - } - DBUG_VOID_RETURN; -} - -static void wsrep_replication_process(THD *thd) -{ - int rcode; - DBUG_ENTER("wsrep_replication_process"); - - struct wsrep_thd_shadow shadow; - wsrep_prepare_bf_thd(thd, &shadow); - - /* From trans_begin() */ - thd->variables.option_bits|= OPTION_BEGIN; - thd->server_status|= SERVER_STATUS_IN_TRANS; - - rcode = wsrep->recv(wsrep, (void *)thd); - DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode)); - - WSREP_INFO("applier thread exiting (code:%d)", rcode); - - switch (rcode) { - case WSREP_OK: - case WSREP_NOT_IMPLEMENTED: - case WSREP_CONN_FAIL: - /* provider does not support slave operations / disconnected from group, - * just close applier thread */ - break; - case WSREP_NODE_FAIL: - /* data inconsistency => SST is needed */ - /* Note: we cannot just blindly restart replication here, - * SST might require server restart if storage engines must be - * initialized after SST */ - WSREP_ERROR("node consistency compromised, aborting"); - wsrep_kill_mysql(thd); - break; - case WSREP_WARNING: - case WSREP_TRX_FAIL: - case WSREP_TRX_MISSING: - /* these suggests a bug in provider code */ - WSREP_WARN("bad return from recv() call: %d", rcode); - /* Shut down this node. */ - /* fall through */ - case WSREP_FATAL: - /* Cluster connectivity is lost. - * - * If applier was killed on purpose (KILL_CONNECTION), we - * avoid mysql shutdown. This is because the killer will then handle - * shutdown processing (or replication restarting) - */ - if (thd->killed != KILL_CONNECTION) - { - wsrep_kill_mysql(thd); - } - break; - } - - mysql_mutex_lock(&LOCK_thread_count); - wsrep_close_applier(thd); - mysql_cond_broadcast(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); if(thd->has_thd_temporary_tables()) { WSREP_WARN("Applier %lld has temporary tables at exit.", thd->thread_id); } - wsrep_return_from_bf_mode(thd, &shadow); DBUG_VOID_RETURN; } -static bool create_wsrep_THD(wsrep_thd_processor_fun processor) +static bool create_wsrep_THD(Wsrep_thd_args* args) { ulong old_wsrep_running_threads= wsrep_running_threads; pthread_t unused; mysql_mutex_lock(&LOCK_thread_count); + bool res= pthread_create(&unused, &connection_attrib, start_wsrep_THD, - (void*)processor); + args); /* if starting a thread on server startup, wait until the this thread's THD is fully initialized (otherwise a THD initialization code might @@ -435,244 +105,291 @@ static bool create_wsrep_THD(wsrep_thd_processor_fun processor) void wsrep_create_appliers(long threads) { - if (!wsrep_connected) + /* Dont' start slave threads if wsrep-provider or wsrep-cluster-address + is not set. + */ + if (!WSREP_PROVIDER_EXISTS) + { + return; + } + + if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0) { - /* see wsrep_replication_start() for the logic */ - if (wsrep_cluster_address && strlen(wsrep_cluster_address) && - wsrep_provider && strcasecmp(wsrep_provider, "none")) - { - WSREP_ERROR("Trying to launch slave threads before creating " - "connection at '%s'", wsrep_cluster_address); - assert(0); - } return; } long wsrep_threads=0; - while (wsrep_threads++ < threads) { - if (create_wsrep_THD(wsrep_replication_process)) + + while (wsrep_threads++ < threads) + { + Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_replication_process, 0)); + if (create_wsrep_THD(args)) + { WSREP_WARN("Can't create thread to manage wsrep replication"); + } } } -static void wsrep_rollback_process(THD *thd) +static void wsrep_rollback_process(THD *rollbacker, + void *arg __attribute__((unused))) { DBUG_ENTER("wsrep_rollback_process"); - mysql_mutex_lock(&LOCK_wsrep_rollback); - wsrep_aborting_thd= NULL; - - while (thd->killed == NOT_KILLED) { - thd_proc_info(thd, "WSREP aborter idle"); - thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; - thd->mysys_var->current_cond= &COND_wsrep_rollback; + THD* thd= NULL; + DBUG_ASSERT(!wsrep_rollback_queue); + wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker); - mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback); + thd_proc_info(rollbacker, "wsrep aborter idle"); + while ((thd= wsrep_rollback_queue->pop_front()) != NULL) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + wsrep::client_state& cs(thd->wsrep_cs()); + const wsrep::transaction& tx(cs.transaction()); + if (tx.state() == wsrep::transaction::s_aborted) + { + WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d", + (long long)thd->real_id, + tx.state()); - WSREP_DEBUG("WSREP rollback thread wakes for signal"); + mysql_mutex_unlock(&thd->LOCK_thd_data); + continue; + } + mysql_mutex_unlock(&thd->LOCK_thd_data); - mysql_mutex_lock(&thd->mysys_var->mutex); - thd_proc_info(thd, "WSREP aborter active"); - thd->mysys_var->current_mutex= 0; - thd->mysys_var->current_cond= 0; - mysql_mutex_unlock(&thd->mysys_var->mutex); + thd_proc_info(rollbacker, "wsrep aborter active"); - /* check for false alarms */ - if (!wsrep_aborting_thd) + wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + if (thd->wsrep_trx().is_streaming() && + thd->wsrep_trx().bf_aborted_in_total_order()) { - WSREP_DEBUG("WSREP rollback thread has empty abort queue"); - } - /* process all entries in the queue */ - while (wsrep_aborting_thd) { - THD *aborting; - wsrep_aborting_thd_t next = wsrep_aborting_thd->next; - aborting = wsrep_aborting_thd->aborting_thd; - my_free(wsrep_aborting_thd); - wsrep_aborting_thd= next; - /* - * must release mutex, appliers my want to add more - * aborting thds in our work queue, while we rollback - */ - mysql_mutex_unlock(&LOCK_wsrep_rollback); - - mysql_mutex_lock(&aborting->LOCK_thd_data); - if (aborting->wsrep_conflict_state== ABORTED) + thd->store_globals(); + thd->wsrep_cs().store_globals(); + if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority) { - WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", - (long long)aborting->real_id, - aborting->wsrep_conflict_state); - - mysql_mutex_unlock(&aborting->LOCK_thd_data); - mysql_mutex_lock(&LOCK_wsrep_rollback); - continue; + DBUG_ASSERT(thd->wsrep_applier_service); + thd->wsrep_applier_service->rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + thd->wsrep_applier_service->after_apply(); + /* Will free THD */ + Wsrep_server_state::instance().server_service(). + release_high_priority_service(thd->wsrep_applier_service); } - aborting->wsrep_conflict_state= ABORTING; - - mysql_mutex_unlock(&aborting->LOCK_thd_data); - - set_current_thd(aborting); - aborting->store_globals(); - - mysql_mutex_lock(&aborting->LOCK_thd_data); - wsrep_client_rollback(aborting); - WSREP_DEBUG("WSREP rollbacker aborted thd: (%lld %lld)", - (longlong) aborting->thread_id, - (longlong) aborting->real_id); - mysql_mutex_unlock(&aborting->LOCK_thd_data); - - set_current_thd(thd); + else + { + mysql_mutex_lock(&thd->LOCK_thd_data); + /* prepare THD for rollback processing */ + thd->reset_for_next_command(true); + thd->lex->sql_command= SQLCOM_ROLLBACK; + mysql_mutex_unlock(&thd->LOCK_thd_data); + /* Perform a client rollback, restore globals and signal + the victim only when all the resources have been + released */ + thd->wsrep_cs().client_service().bf_rollback(); + thd->reset_globals(); + thd->wsrep_cs().sync_rollback_complete(); + } + } + else if (wsrep_thd_is_applying(thd)) + { + WSREP_DEBUG("rollbacker aborting SR thd: (%lld %llu)", + thd->thread_id, (long long)thd->real_id); + DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority); + /* Must be streaming and must have been removed from the + server state streaming appliers map. */ + DBUG_ASSERT(thd->wsrep_trx().is_streaming()); + DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier( + thd->wsrep_trx().server_id(), + thd->wsrep_trx().id())); + DBUG_ASSERT(thd->wsrep_applier_service); + + /* Fragment removal should happen before rollback to make + the transaction non-observable in SR table after the rollback + completes. For correctness the order does not matter here, + but currently it is mandated by checks in some MTR tests. */ + Wsrep_storage_service* storage_service= + static_cast<Wsrep_storage_service*>( + Wsrep_server_state::instance().server_service().storage_service( + *thd->wsrep_applier_service)); + storage_service->store_globals(); + storage_service->adopt_transaction(thd->wsrep_trx()); + storage_service->remove_fragments(); + storage_service->commit(wsrep::ws_handle(transaction_id, 0), + wsrep::ws_meta()); + Wsrep_server_state::instance().server_service().release_storage_service(storage_service); thd->store_globals(); + thd->wsrep_cs().store_globals(); + thd->wsrep_applier_service->rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + thd->wsrep_applier_service->after_apply(); + /* Will free THD */ + Wsrep_server_state::instance().server_service() + .release_high_priority_service(thd->wsrep_applier_service); - mysql_mutex_lock(&LOCK_wsrep_rollback); } + else + { + if (thd->wsrep_trx().is_streaming()) + { + Wsrep_storage_service* storage_service= + static_cast<Wsrep_storage_service*>( + Wsrep_server_state::instance().server_service(). + storage_service(thd->wsrep_cs().client_service())); + + storage_service->store_globals(); + storage_service->adopt_transaction(thd->wsrep_trx()); + storage_service->remove_fragments(); + storage_service->commit(wsrep::ws_handle(transaction_id, 0), + wsrep::ws_meta()); + Wsrep_server_state::instance().server_service(). + release_storage_service(storage_service); + } + thd->store_globals(); + thd->wsrep_cs().store_globals(); + mysql_mutex_lock(&thd->LOCK_thd_data); + /* prepare THD for rollback processing */ + thd->reset_for_next_command(); + thd->lex->sql_command= SQLCOM_ROLLBACK; + mysql_mutex_unlock(&thd->LOCK_thd_data); + /* Perform a client rollback, restore globals and signal + the victim only when all the resources have been + released */ + thd->wsrep_cs().client_service().bf_rollback(); + thd->reset_globals(); + thd->wsrep_cs().sync_rollback_complete(); + WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)", + thd->thread_id, (long long)thd->real_id); + } + + thd_proc_info(rollbacker, "wsrep aborter idle"); } + + delete wsrep_rollback_queue; + wsrep_rollback_queue= NULL; - mysql_mutex_unlock(&LOCK_wsrep_rollback); sql_print_information("WSREP: rollbacker thread exiting"); + DBUG_ASSERT(rollbacker->killed != NOT_KILLED); DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting")); DBUG_VOID_RETURN; } -void wsrep_create_rollbacker() +static void wsrep_post_rollback_process(THD *post_rollbacker, + void *arg __attribute__((unused))) { - if (wsrep_provider && strcasecmp(wsrep_provider, "none")) - { - /* create rollbacker */ - if (create_wsrep_THD(wsrep_rollback_process)) - WSREP_WARN("Can't create thread to manage wsrep rollback"); - } -} + DBUG_ENTER("wsrep_post_rollback_process"); + THD* thd= NULL; -void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe) -{ - if (thd_ptr) - { - THD* thd = (THD*)thd_ptr; - thd->wsrep_PA_safe = safe; - } -} + DBUG_ASSERT(!wsrep_post_rollback_queue); + wsrep_post_rollback_queue= new Wsrep_thd_queue(post_rollbacker); -enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd, my_bool sync) -{ - enum wsrep_conflict_state state = NO_CONFLICT; - if (thd) + while ((thd= wsrep_post_rollback_queue->pop_front()) != NULL) { - if (sync) mysql_mutex_lock(&thd->LOCK_thd_data); - - state = thd->wsrep_conflict_state; - if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data); + thd->store_globals(); + wsrep::client_state& cs(thd->wsrep_cs()); + mysql_mutex_lock(&thd->LOCK_thd_data); + DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting); + WSREP_DEBUG("post rollbacker calling post rollback for thd %llu, conf %s", + thd->thread_id, wsrep_thd_transaction_state_str(thd)); + + cs.after_rollback(); + DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborted); + mysql_mutex_unlock(&thd->LOCK_thd_data); } - return state; -} -my_bool wsrep_thd_is_wsrep(THD *thd) -{ - my_bool status = FALSE; - if (thd) - { - status = (WSREP(thd) && WSREP_PROVIDER_EXISTS); - } - return status; + delete wsrep_post_rollback_queue; + wsrep_post_rollback_queue= NULL; + + DBUG_ASSERT(post_rollbacker->killed != NOT_KILLED); + DBUG_PRINT("wsrep",("wsrep post rollbacker thread exiting")); + DBUG_VOID_RETURN; } -my_bool wsrep_thd_is_BF(THD *thd, my_bool sync) +void wsrep_create_rollbacker() { - my_bool status = FALSE; - if (thd) + if (wsrep_provider && strcasecmp(wsrep_provider, "none")) { - // THD can be BF only if provider exists - if (wsrep_thd_is_wsrep(thd)) - { - if (sync) - mysql_mutex_lock(&thd->LOCK_thd_data); + Wsrep_thd_args* args= new Wsrep_thd_args(wsrep_rollback_process, 0); - status = ((thd->wsrep_exec_mode == REPL_RECV) || - (thd->wsrep_exec_mode == TOTAL_ORDER)); - if (sync) - mysql_mutex_unlock(&thd->LOCK_thd_data); - } - } - return status; -} + /* create rollbacker */ + if (create_wsrep_THD(args)) + WSREP_WARN("Can't create thread to manage wsrep rollback"); -extern "C" -my_bool wsrep_thd_is_BF_or_commit(void *thd_ptr, my_bool sync) -{ - bool status = FALSE; - if (thd_ptr) - { - THD* thd = (THD*)thd_ptr; - if (sync) mysql_mutex_lock(&thd->LOCK_thd_data); - - status = ((thd->wsrep_exec_mode == REPL_RECV) || - (thd->wsrep_exec_mode == TOTAL_ORDER) || - (thd->wsrep_exec_mode == LOCAL_COMMIT)); - if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data); - } - return status; + /* create post_rollbacker */ + args= new Wsrep_thd_args(wsrep_post_rollback_process, 0); + if (create_wsrep_THD(args)) + WSREP_WARN("Can't create thread to manage wsrep post rollback"); + } } -extern "C" -my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync) +/* + Start async rollback process + + Asserts thd->LOCK_thd_data ownership + */ +void wsrep_fire_rollbacker(THD *thd) { - bool status = FALSE; - if (thd_ptr) + DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting); + DBUG_PRINT("wsrep",("enqueuing trx abort for %llu", thd->thread_id)); + WSREP_DEBUG("enqueuing trx abort for (%llu)", thd->thread_id); + if (wsrep_rollback_queue->push_back(thd)) { - THD* thd = (THD*)thd_ptr; - if (sync) mysql_mutex_lock(&thd->LOCK_thd_data); - - status = (thd->wsrep_exec_mode == LOCAL_STATE); - if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data); + WSREP_WARN("duplicate thd %llu for rollbacker", + thd->thread_id); } - return status; } + int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) { - THD *victim_thd = (THD *) victim_thd_ptr; - THD *bf_thd = (THD *) bf_thd_ptr; DBUG_ENTER("wsrep_abort_thd"); - + THD *victim_thd= (THD *) victim_thd_ptr; + THD *bf_thd= (THD *) bf_thd_ptr; + mysql_mutex_lock(&victim_thd->LOCK_thd_data); if ( (WSREP(bf_thd) || ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) && - bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && - victim_thd) + wsrep_thd_is_toi(bf_thd)) ) && + victim_thd && + !wsrep_thd_is_aborting(victim_thd)) { - if ((victim_thd->wsrep_conflict_state == MUST_ABORT) || - (victim_thd->wsrep_conflict_state == ABORTED) || - (victim_thd->wsrep_conflict_state == ABORTING)) - { - WSREP_DEBUG("wsrep_abort_thd called by %llu with victim %llu already " - "aborted. Ignoring.", - (bf_thd) ? (long long)bf_thd->real_id : 0, - (long long)victim_thd->real_id); - DBUG_RETURN(1); - } - - WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? - (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); - ha_abort_transaction(bf_thd, victim_thd, signal); + WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? + (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); + mysql_mutex_unlock(&victim_thd->LOCK_thd_data); + ha_abort_transaction(bf_thd, victim_thd, signal); + mysql_mutex_lock(&victim_thd->LOCK_thd_data); } else { WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd); } - + mysql_mutex_unlock(&victim_thd->LOCK_thd_data); DBUG_RETURN(1); } -extern "C" -int wsrep_thd_in_locking_session(void *thd_ptr) +bool wsrep_bf_abort(const THD* bf_thd, THD* victim_thd) { - if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) { - return 1; + WSREP_LOG_THD((THD*)bf_thd, "BF aborter before"); + WSREP_LOG_THD(victim_thd, "victim before"); + wsrep::seqno bf_seqno(bf_thd->wsrep_trx().ws_meta().seqno()); + + if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active()) + { + WSREP_DEBUG("wsrep_bf_abort, BF abort for non active transaction"); + wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id()); } - return 0; -} -bool wsrep_thd_has_explicit_locks(THD *thd) -{ - assert(thd); - return thd->mdl_context.has_explicit_locks(); + bool ret; + if (wsrep_thd_is_toi(bf_thd)) + { + ret= victim_thd->wsrep_cs().total_order_bf_abort(bf_seqno); + } + else + { + ret= victim_thd->wsrep_cs().bf_abort(bf_seqno); + } + if (ret) + { + wsrep_bf_aborts_counter++; + } + return ret; } + |