summaryrefslogtreecommitdiff
path: root/sql/wsrep_thd.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r--sql/wsrep_thd.cc787
1 files changed, 252 insertions, 535 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index ce6d9688cb3..4f9915fa05f 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -15,412 +15,82 @@
#include "mariadb.h"
#include "wsrep_thd.h"
+#include "wsrep_trans_observer.h"
+#include "wsrep_high_priority_service.h"
+#include "wsrep_storage_service.h"
#include "transaction.h"
#include "rpl_rli.h"
#include "log_event.h"
#include "sql_parse.h"
-//#include "global_threads.h" // LOCK_thread_count, etc.
#include "sql_base.h" // close_thread_tables()
#include "mysqld.h" // start_wsrep_THD();
-
-#include "slave.h" // opt_log_slave_updates
-#include "rpl_filter.h"
+#include "wsrep_applier.h" // start_wsrep_THD();
+#include "mysql/service_wsrep.h"
+#include "debug_sync.h"
+#include "slave.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
-#if (__LP64__)
-static volatile int64 wsrep_bf_aborts_counter(0);
-#define WSREP_ATOMIC_LOAD_LONG my_atomic_load64
-#define WSREP_ATOMIC_ADD_LONG my_atomic_add64
-#else
-static volatile int32 wsrep_bf_aborts_counter(0);
-#define WSREP_ATOMIC_LOAD_LONG my_atomic_load32
-#define WSREP_ATOMIC_ADD_LONG my_atomic_add32
-#endif
+static Wsrep_thd_queue* wsrep_rollback_queue= 0;
+static Wsrep_thd_queue* wsrep_post_rollback_queue= 0;
+static Atomic_counter<uint64_t> wsrep_bf_aborts_counter;
+
int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope)
{
- wsrep_local_bf_aborts = WSREP_ATOMIC_LOAD_LONG(&wsrep_bf_aborts_counter);
- var->type = SHOW_LONGLONG;
- var->value = (char*)&wsrep_local_bf_aborts;
+ wsrep_local_bf_aborts= wsrep_bf_aborts_counter;
+ var->type= SHOW_LONGLONG;
+ var->value= (char*)&wsrep_local_bf_aborts;
return 0;
}
-/* must have (&thd->LOCK_thd_data) */
-void wsrep_client_rollback(THD *thd)
-{
- WSREP_DEBUG("client rollback due to BF abort for (%lld), query: %s",
- (longlong) thd->thread_id, thd->query());
-
- WSREP_ATOMIC_ADD_LONG(&wsrep_bf_aborts_counter, 1);
-
- thd->wsrep_conflict_state= ABORTING;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- trans_rollback(thd);
-
- if (thd->locked_tables_mode && thd->lock)
- {
- WSREP_DEBUG("unlocking tables for BF abort (%lld)",
- (longlong) thd->thread_id);
- thd->locked_tables_list.unlock_locked_tables(thd);
- thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
- }
-
- if (thd->global_read_lock.is_acquired())
- {
- WSREP_DEBUG("unlocking GRL for BF abort (%lld)",
- (longlong) thd->thread_id);
- thd->global_read_lock.unlock_global_read_lock(thd);
- }
-
- /* Release transactional metadata locks. */
- thd->mdl_context.release_transactional_locks();
-
- /* release explicit MDL locks */
- thd->mdl_context.release_explicit_locks();
-
- if (thd->get_binlog_table_maps())
- {
- WSREP_DEBUG("clearing binlog table map for BF abort (%lld)",
- (longlong) thd->thread_id);
- thd->clear_binlog_table_maps();
- }
- mysql_mutex_lock(&thd->LOCK_thd_data);
- thd->wsrep_conflict_state= ABORTED;
-}
-
-#define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
-#define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
-
-static rpl_group_info* wsrep_relay_group_init(const char* log_fname)
-{
- Relay_log_info* rli= new Relay_log_info(false);
-
- if (!rli->relay_log.description_event_for_exec)
- {
- rli->relay_log.description_event_for_exec=
- new Format_description_log_event(4);
- }
-
- static LEX_CSTRING connection_name= { STRING_WITH_LEN("wsrep") };
-
- /*
- Master_info's constructor initializes rpl_filter by either an already
- constructed Rpl_filter object from global 'rpl_filters' list if the
- specified connection name is same, or it constructs a new Rpl_filter
- object and adds it to rpl_filters. This object is later destructed by
- Mater_info's destructor by looking it up based on connection name in
- rpl_filters list.
-
- However, since all Master_info objects created here would share same
- connection name ("wsrep"), destruction of any of the existing Master_info
- objects (in wsrep_return_from_bf_mode()) would free rpl_filter referenced
- by any/all existing Master_info objects.
-
- In order to avoid that, we have added a check in Master_info's destructor
- to not free the "wsrep" rpl_filter. It will eventually be freed by
- free_all_rpl_filters() when server terminates.
- */
- rli->mi = new Master_info(&connection_name, false);
-
- struct rpl_group_info *rgi= new rpl_group_info(rli);
- rgi->thd= rli->sql_driver_thd= current_thd;
-
- if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
- {
- rgi->deferred_events= new Deferred_log_events(rli);
- }
-
- return rgi;
-}
-
-static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
+static void wsrep_replication_process(THD *thd,
+ void* arg __attribute__((unused)))
{
- shadow->options = thd->variables.option_bits;
- shadow->server_status = thd->server_status;
- shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
- shadow->vio = thd->net.vio;
-
- // Disable general logging on applier threads
- thd->variables.option_bits |= OPTION_LOG_OFF;
- // Enable binlogging if opt_log_slave_updates is set
- if (opt_log_slave_updates)
- thd->variables.option_bits|= OPTION_BIN_LOG;
- else
- thd->variables.option_bits&= ~(OPTION_BIN_LOG);
+ DBUG_ENTER("wsrep_replication_process");
- if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay");
+ Wsrep_applier_service applier_service(thd);
/* thd->system_thread_info.rpl_sql_info isn't initialized. */
thd->system_thread_info.rpl_sql_info=
new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter);
- thd->wsrep_exec_mode= REPL_RECV;
- thd->net.vio= 0;
- thd->clear_error();
-
- shadow->tx_isolation = thd->variables.tx_isolation;
- thd->variables.tx_isolation = ISO_READ_COMMITTED;
- thd->tx_isolation = ISO_READ_COMMITTED;
-
- shadow->db = thd->db.str;
- shadow->db_length = thd->db.length;
- shadow->user_time = thd->user_time;
- shadow->row_count_func= thd->get_row_count_func();
- thd->reset_db(&null_clex_str);
-}
+ WSREP_INFO("Starting applier thread %llu", thd->thread_id);
+ enum wsrep::provider::status
+ ret= Wsrep_server_state::get_provider().run_applier(&applier_service);
-static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
-{
- LEX_CSTRING db= {shadow->db, shadow->db_length };
- thd->variables.option_bits = shadow->options;
- thd->server_status = shadow->server_status;
- thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
- thd->net.vio = shadow->vio;
- thd->variables.tx_isolation = shadow->tx_isolation;
- thd->user_time = shadow->user_time;
- thd->reset_db(&db);
+ WSREP_INFO("Applier thread exiting %d", ret);
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_close_applier(thd);
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
delete thd->system_thread_info.rpl_sql_info;
delete thd->wsrep_rgi->rli->mi;
delete thd->wsrep_rgi->rli;
-
+
thd->wsrep_rgi->cleanup_after_session();
delete thd->wsrep_rgi;
- thd->wsrep_rgi = NULL;
- thd->set_row_count_func(shadow->row_count_func);
-}
-
-void wsrep_replay_transaction(THD *thd)
-{
- DBUG_ENTER("wsrep_replay_transaction");
- /* checking if BF trx must be replayed */
- if (thd->wsrep_conflict_state== MUST_REPLAY) {
- DBUG_ASSERT(wsrep_thd_trx_seqno(thd));
- if (thd->wsrep_exec_mode!= REPL_RECV) {
- if (thd->get_stmt_da()->is_sent())
- {
- WSREP_ERROR("replay issue, thd has reported status already");
- }
-
-
- /*
- PS reprepare observer should have been removed already.
- open_table() will fail if we have dangling observer here.
- */
- DBUG_ASSERT(thd->m_reprepare_observer == NULL);
-
- struct da_shadow
- {
- enum Diagnostics_area::enum_diagnostics_status status;
- ulonglong affected_rows;
- ulonglong last_insert_id;
- char message[MYSQL_ERRMSG_SIZE];
- };
- struct da_shadow da_status;
- da_status.status= thd->get_stmt_da()->status();
- if (da_status.status == Diagnostics_area::DA_OK)
- {
- da_status.affected_rows= thd->get_stmt_da()->affected_rows();
- da_status.last_insert_id= thd->get_stmt_da()->last_insert_id();
- strmake(da_status.message,
- thd->get_stmt_da()->message(),
- sizeof(da_status.message)-1);
- }
-
- thd->get_stmt_da()->reset_diagnostics_area();
-
- thd->wsrep_conflict_state= REPLAYING;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ thd->wsrep_rgi= NULL;
- thd->reset_for_next_command();
- thd->reset_killed();
- close_thread_tables(thd);
- if (thd->locked_tables_mode && thd->lock)
- {
- WSREP_DEBUG("releasing table lock for replaying (%lld)",
- (longlong) thd->thread_id);
- thd->locked_tables_list.unlock_locked_tables(thd);
- thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
- }
- thd->mdl_context.release_transactional_locks();
- /*
- Replaying will call MYSQL_START_STATEMENT when handling
- BEGIN Query_log_event so end statement must be called before
- replaying.
- */
- MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
- thd->m_statement_psi= NULL;
- thd->m_digest= NULL;
- thd_proc_info(thd, "WSREP replaying trx");
- WSREP_DEBUG("replay trx: %s %lld",
- thd->query() ? thd->query() : "void",
- (long long)wsrep_thd_trx_seqno(thd));
- struct wsrep_thd_shadow shadow;
- wsrep_prepare_bf_thd(thd, &shadow);
-
- /* From trans_begin() */
- thd->variables.option_bits|= OPTION_BEGIN;
- thd->server_status|= SERVER_STATUS_IN_TRANS;
-
- int rcode = wsrep->replay_trx(wsrep,
- &thd->wsrep_ws_handle,
- (void *)thd);
-
- wsrep_return_from_bf_mode(thd, &shadow);
- if (thd->wsrep_conflict_state!= REPLAYING)
- WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
-
- mysql_mutex_lock(&thd->LOCK_thd_data);
-
- switch (rcode)
- {
- case WSREP_OK:
- thd->wsrep_conflict_state= NO_CONFLICT;
- wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
- WSREP_DEBUG("trx_replay successful for: %lld %lld",
- (longlong) thd->thread_id, (longlong) thd->real_id);
- if (thd->get_stmt_da()->is_sent())
- {
- WSREP_WARN("replay ok, thd has reported status");
- }
- else if (thd->get_stmt_da()->is_set())
- {
- if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK &&
- thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK)
- {
- WSREP_WARN("replay ok, thd has error status %d",
- thd->get_stmt_da()->status());
- }
- }
- else
- {
- if (da_status.status == Diagnostics_area::DA_OK)
- {
- my_ok(thd,
- da_status.affected_rows,
- da_status.last_insert_id,
- da_status.message);
- }
- else
- {
- my_ok(thd);
- }
- }
- break;
- case WSREP_TRX_FAIL:
- if (thd->get_stmt_da()->is_sent())
- {
- WSREP_ERROR("replay failed, thd has reported status");
- }
- else
- {
- WSREP_DEBUG("replay failed, rolling back");
- }
- thd->wsrep_conflict_state= ABORTED;
- wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
- break;
- default:
- WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
- rcode, thd->get_db(),
- thd->query() ? thd->query() : "void");
- /* we're now in inconsistent state, must abort */
-
- /* http://bazaar.launchpad.net/~codership/codership-mysql/5.6/revision/3962#sql/wsrep_thd.cc */
- mysql_mutex_unlock(&thd->LOCK_thd_data);
-
- unireg_abort(1);
- break;
- }
-
- wsrep_cleanup_transaction(thd);
-
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying--;
- WSREP_DEBUG("replaying decreased: %d, thd: %lld",
- wsrep_replaying, (longlong) thd->thread_id);
- mysql_cond_broadcast(&COND_wsrep_replaying);
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
- }
- }
- DBUG_VOID_RETURN;
-}
-
-static void wsrep_replication_process(THD *thd)
-{
- int rcode;
- DBUG_ENTER("wsrep_replication_process");
-
- struct wsrep_thd_shadow shadow;
- wsrep_prepare_bf_thd(thd, &shadow);
-
- /* From trans_begin() */
- thd->variables.option_bits|= OPTION_BEGIN;
- thd->server_status|= SERVER_STATUS_IN_TRANS;
-
- rcode = wsrep->recv(wsrep, (void *)thd);
- DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
-
- WSREP_INFO("applier thread exiting (code:%d)", rcode);
-
- switch (rcode) {
- case WSREP_OK:
- case WSREP_NOT_IMPLEMENTED:
- case WSREP_CONN_FAIL:
- /* provider does not support slave operations / disconnected from group,
- * just close applier thread */
- break;
- case WSREP_NODE_FAIL:
- /* data inconsistency => SST is needed */
- /* Note: we cannot just blindly restart replication here,
- * SST might require server restart if storage engines must be
- * initialized after SST */
- WSREP_ERROR("node consistency compromised, aborting");
- wsrep_kill_mysql(thd);
- break;
- case WSREP_WARNING:
- case WSREP_TRX_FAIL:
- case WSREP_TRX_MISSING:
- /* these suggests a bug in provider code */
- WSREP_WARN("bad return from recv() call: %d", rcode);
- /* Shut down this node. */
- /* fall through */
- case WSREP_FATAL:
- /* Cluster connectivity is lost.
- *
- * If applier was killed on purpose (KILL_CONNECTION), we
- * avoid mysql shutdown. This is because the killer will then handle
- * shutdown processing (or replication restarting)
- */
- if (thd->killed != KILL_CONNECTION)
- {
- wsrep_kill_mysql(thd);
- }
- break;
- }
-
- mysql_mutex_lock(&LOCK_thread_count);
- wsrep_close_applier(thd);
- mysql_cond_broadcast(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
if(thd->has_thd_temporary_tables())
{
WSREP_WARN("Applier %lld has temporary tables at exit.",
thd->thread_id);
}
- wsrep_return_from_bf_mode(thd, &shadow);
DBUG_VOID_RETURN;
}
-static bool create_wsrep_THD(wsrep_thd_processor_fun processor)
+static bool create_wsrep_THD(Wsrep_thd_args* args)
{
ulong old_wsrep_running_threads= wsrep_running_threads;
pthread_t unused;
mysql_mutex_lock(&LOCK_thread_count);
+
bool res= pthread_create(&unused, &connection_attrib, start_wsrep_THD,
- (void*)processor);
+ args);
/*
if starting a thread on server startup, wait until the this thread's THD
is fully initialized (otherwise a THD initialization code might
@@ -435,244 +105,291 @@ static bool create_wsrep_THD(wsrep_thd_processor_fun processor)
void wsrep_create_appliers(long threads)
{
- if (!wsrep_connected)
+ /* Dont' start slave threads if wsrep-provider or wsrep-cluster-address
+ is not set.
+ */
+ if (!WSREP_PROVIDER_EXISTS)
+ {
+ return;
+ }
+
+ if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0)
{
- /* see wsrep_replication_start() for the logic */
- if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
- wsrep_provider && strcasecmp(wsrep_provider, "none"))
- {
- WSREP_ERROR("Trying to launch slave threads before creating "
- "connection at '%s'", wsrep_cluster_address);
- assert(0);
- }
return;
}
long wsrep_threads=0;
- while (wsrep_threads++ < threads) {
- if (create_wsrep_THD(wsrep_replication_process))
+
+ while (wsrep_threads++ < threads)
+ {
+ Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_replication_process, 0));
+ if (create_wsrep_THD(args))
+ {
WSREP_WARN("Can't create thread to manage wsrep replication");
+ }
}
}
-static void wsrep_rollback_process(THD *thd)
+static void wsrep_rollback_process(THD *rollbacker,
+ void *arg __attribute__((unused)))
{
DBUG_ENTER("wsrep_rollback_process");
- mysql_mutex_lock(&LOCK_wsrep_rollback);
- wsrep_aborting_thd= NULL;
-
- while (thd->killed == NOT_KILLED) {
- thd_proc_info(thd, "WSREP aborter idle");
- thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
- thd->mysys_var->current_cond= &COND_wsrep_rollback;
+ THD* thd= NULL;
+ DBUG_ASSERT(!wsrep_rollback_queue);
+ wsrep_rollback_queue= new Wsrep_thd_queue(rollbacker);
- mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
+ thd_proc_info(rollbacker, "wsrep aborter idle");
+ while ((thd= wsrep_rollback_queue->pop_front()) != NULL)
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ wsrep::client_state& cs(thd->wsrep_cs());
+ const wsrep::transaction& tx(cs.transaction());
+ if (tx.state() == wsrep::transaction::s_aborted)
+ {
+ WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d",
+ (long long)thd->real_id,
+ tx.state());
- WSREP_DEBUG("WSREP rollback thread wakes for signal");
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ continue;
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
- mysql_mutex_lock(&thd->mysys_var->mutex);
- thd_proc_info(thd, "WSREP aborter active");
- thd->mysys_var->current_mutex= 0;
- thd->mysys_var->current_cond= 0;
- mysql_mutex_unlock(&thd->mysys_var->mutex);
+ thd_proc_info(rollbacker, "wsrep aborter active");
- /* check for false alarms */
- if (!wsrep_aborting_thd)
+ wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ if (thd->wsrep_trx().is_streaming() &&
+ thd->wsrep_trx().bf_aborted_in_total_order())
{
- WSREP_DEBUG("WSREP rollback thread has empty abort queue");
- }
- /* process all entries in the queue */
- while (wsrep_aborting_thd) {
- THD *aborting;
- wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
- aborting = wsrep_aborting_thd->aborting_thd;
- my_free(wsrep_aborting_thd);
- wsrep_aborting_thd= next;
- /*
- * must release mutex, appliers my want to add more
- * aborting thds in our work queue, while we rollback
- */
- mysql_mutex_unlock(&LOCK_wsrep_rollback);
-
- mysql_mutex_lock(&aborting->LOCK_thd_data);
- if (aborting->wsrep_conflict_state== ABORTED)
+ thd->store_globals();
+ thd->wsrep_cs().store_globals();
+ if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
{
- WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
- (long long)aborting->real_id,
- aborting->wsrep_conflict_state);
-
- mysql_mutex_unlock(&aborting->LOCK_thd_data);
- mysql_mutex_lock(&LOCK_wsrep_rollback);
- continue;
+ DBUG_ASSERT(thd->wsrep_applier_service);
+ thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
+ wsrep::ws_meta());
+ thd->wsrep_applier_service->after_apply();
+ /* Will free THD */
+ Wsrep_server_state::instance().server_service().
+ release_high_priority_service(thd->wsrep_applier_service);
}
- aborting->wsrep_conflict_state= ABORTING;
-
- mysql_mutex_unlock(&aborting->LOCK_thd_data);
-
- set_current_thd(aborting);
- aborting->store_globals();
-
- mysql_mutex_lock(&aborting->LOCK_thd_data);
- wsrep_client_rollback(aborting);
- WSREP_DEBUG("WSREP rollbacker aborted thd: (%lld %lld)",
- (longlong) aborting->thread_id,
- (longlong) aborting->real_id);
- mysql_mutex_unlock(&aborting->LOCK_thd_data);
-
- set_current_thd(thd);
+ else
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ /* prepare THD for rollback processing */
+ thd->reset_for_next_command(true);
+ thd->lex->sql_command= SQLCOM_ROLLBACK;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ /* Perform a client rollback, restore globals and signal
+ the victim only when all the resources have been
+ released */
+ thd->wsrep_cs().client_service().bf_rollback();
+ thd->reset_globals();
+ thd->wsrep_cs().sync_rollback_complete();
+ }
+ }
+ else if (wsrep_thd_is_applying(thd))
+ {
+ WSREP_DEBUG("rollbacker aborting SR thd: (%lld %llu)",
+ thd->thread_id, (long long)thd->real_id);
+ DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
+ /* Must be streaming and must have been removed from the
+ server state streaming appliers map. */
+ DBUG_ASSERT(thd->wsrep_trx().is_streaming());
+ DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
+ thd->wsrep_trx().server_id(),
+ thd->wsrep_trx().id()));
+ DBUG_ASSERT(thd->wsrep_applier_service);
+
+ /* Fragment removal should happen before rollback to make
+ the transaction non-observable in SR table after the rollback
+ completes. For correctness the order does not matter here,
+ but currently it is mandated by checks in some MTR tests. */
+ Wsrep_storage_service* storage_service=
+ static_cast<Wsrep_storage_service*>(
+ Wsrep_server_state::instance().server_service().storage_service(
+ *thd->wsrep_applier_service));
+ storage_service->store_globals();
+ storage_service->adopt_transaction(thd->wsrep_trx());
+ storage_service->remove_fragments();
+ storage_service->commit(wsrep::ws_handle(transaction_id, 0),
+ wsrep::ws_meta());
+ Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
thd->store_globals();
+ thd->wsrep_cs().store_globals();
+ thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
+ wsrep::ws_meta());
+ thd->wsrep_applier_service->after_apply();
+ /* Will free THD */
+ Wsrep_server_state::instance().server_service()
+ .release_high_priority_service(thd->wsrep_applier_service);
- mysql_mutex_lock(&LOCK_wsrep_rollback);
}
+ else
+ {
+ if (thd->wsrep_trx().is_streaming())
+ {
+ Wsrep_storage_service* storage_service=
+ static_cast<Wsrep_storage_service*>(
+ Wsrep_server_state::instance().server_service().
+ storage_service(thd->wsrep_cs().client_service()));
+
+ storage_service->store_globals();
+ storage_service->adopt_transaction(thd->wsrep_trx());
+ storage_service->remove_fragments();
+ storage_service->commit(wsrep::ws_handle(transaction_id, 0),
+ wsrep::ws_meta());
+ Wsrep_server_state::instance().server_service().
+ release_storage_service(storage_service);
+ }
+ thd->store_globals();
+ thd->wsrep_cs().store_globals();
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ /* prepare THD for rollback processing */
+ thd->reset_for_next_command();
+ thd->lex->sql_command= SQLCOM_ROLLBACK;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ /* Perform a client rollback, restore globals and signal
+ the victim only when all the resources have been
+ released */
+ thd->wsrep_cs().client_service().bf_rollback();
+ thd->reset_globals();
+ thd->wsrep_cs().sync_rollback_complete();
+ WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
+ thd->thread_id, (long long)thd->real_id);
+ }
+
+ thd_proc_info(rollbacker, "wsrep aborter idle");
}
+
+ delete wsrep_rollback_queue;
+ wsrep_rollback_queue= NULL;
- mysql_mutex_unlock(&LOCK_wsrep_rollback);
sql_print_information("WSREP: rollbacker thread exiting");
+ DBUG_ASSERT(rollbacker->killed != NOT_KILLED);
DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
DBUG_VOID_RETURN;
}
-void wsrep_create_rollbacker()
+static void wsrep_post_rollback_process(THD *post_rollbacker,
+ void *arg __attribute__((unused)))
{
- if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
- {
- /* create rollbacker */
- if (create_wsrep_THD(wsrep_rollback_process))
- WSREP_WARN("Can't create thread to manage wsrep rollback");
- }
-}
+ DBUG_ENTER("wsrep_post_rollback_process");
+ THD* thd= NULL;
-void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe)
-{
- if (thd_ptr)
- {
- THD* thd = (THD*)thd_ptr;
- thd->wsrep_PA_safe = safe;
- }
-}
+ DBUG_ASSERT(!wsrep_post_rollback_queue);
+ wsrep_post_rollback_queue= new Wsrep_thd_queue(post_rollbacker);
-enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd, my_bool sync)
-{
- enum wsrep_conflict_state state = NO_CONFLICT;
- if (thd)
+ while ((thd= wsrep_post_rollback_queue->pop_front()) != NULL)
{
- if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
-
- state = thd->wsrep_conflict_state;
- if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
+ thd->store_globals();
+ wsrep::client_state& cs(thd->wsrep_cs());
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
+ WSREP_DEBUG("post rollbacker calling post rollback for thd %llu, conf %s",
+ thd->thread_id, wsrep_thd_transaction_state_str(thd));
+
+ cs.after_rollback();
+ DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborted);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
}
- return state;
-}
-my_bool wsrep_thd_is_wsrep(THD *thd)
-{
- my_bool status = FALSE;
- if (thd)
- {
- status = (WSREP(thd) && WSREP_PROVIDER_EXISTS);
- }
- return status;
+ delete wsrep_post_rollback_queue;
+ wsrep_post_rollback_queue= NULL;
+
+ DBUG_ASSERT(post_rollbacker->killed != NOT_KILLED);
+ DBUG_PRINT("wsrep",("wsrep post rollbacker thread exiting"));
+ DBUG_VOID_RETURN;
}
-my_bool wsrep_thd_is_BF(THD *thd, my_bool sync)
+void wsrep_create_rollbacker()
{
- my_bool status = FALSE;
- if (thd)
+ if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
{
- // THD can be BF only if provider exists
- if (wsrep_thd_is_wsrep(thd))
- {
- if (sync)
- mysql_mutex_lock(&thd->LOCK_thd_data);
+ Wsrep_thd_args* args= new Wsrep_thd_args(wsrep_rollback_process, 0);
- status = ((thd->wsrep_exec_mode == REPL_RECV) ||
- (thd->wsrep_exec_mode == TOTAL_ORDER));
- if (sync)
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- }
- }
- return status;
-}
+ /* create rollbacker */
+ if (create_wsrep_THD(args))
+ WSREP_WARN("Can't create thread to manage wsrep rollback");
-extern "C"
-my_bool wsrep_thd_is_BF_or_commit(void *thd_ptr, my_bool sync)
-{
- bool status = FALSE;
- if (thd_ptr)
- {
- THD* thd = (THD*)thd_ptr;
- if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
-
- status = ((thd->wsrep_exec_mode == REPL_RECV) ||
- (thd->wsrep_exec_mode == TOTAL_ORDER) ||
- (thd->wsrep_exec_mode == LOCAL_COMMIT));
- if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
- }
- return status;
+ /* create post_rollbacker */
+ args= new Wsrep_thd_args(wsrep_post_rollback_process, 0);
+ if (create_wsrep_THD(args))
+ WSREP_WARN("Can't create thread to manage wsrep post rollback");
+ }
}
-extern "C"
-my_bool wsrep_thd_is_local(void *thd_ptr, my_bool sync)
+/*
+ Start async rollback process
+
+ Asserts thd->LOCK_thd_data ownership
+ */
+void wsrep_fire_rollbacker(THD *thd)
{
- bool status = FALSE;
- if (thd_ptr)
+ DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
+ DBUG_PRINT("wsrep",("enqueuing trx abort for %llu", thd->thread_id));
+ WSREP_DEBUG("enqueuing trx abort for (%llu)", thd->thread_id);
+ if (wsrep_rollback_queue->push_back(thd))
{
- THD* thd = (THD*)thd_ptr;
- if (sync) mysql_mutex_lock(&thd->LOCK_thd_data);
-
- status = (thd->wsrep_exec_mode == LOCAL_STATE);
- if (sync) mysql_mutex_unlock(&thd->LOCK_thd_data);
+ WSREP_WARN("duplicate thd %llu for rollbacker",
+ thd->thread_id);
}
- return status;
}
+
int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
{
- THD *victim_thd = (THD *) victim_thd_ptr;
- THD *bf_thd = (THD *) bf_thd_ptr;
DBUG_ENTER("wsrep_abort_thd");
-
+ THD *victim_thd= (THD *) victim_thd_ptr;
+ THD *bf_thd= (THD *) bf_thd_ptr;
+ mysql_mutex_lock(&victim_thd->LOCK_thd_data);
if ( (WSREP(bf_thd) ||
( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) &&
- bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
- victim_thd)
+ wsrep_thd_is_toi(bf_thd)) ) &&
+ victim_thd &&
+ !wsrep_thd_is_aborting(victim_thd))
{
- if ((victim_thd->wsrep_conflict_state == MUST_ABORT) ||
- (victim_thd->wsrep_conflict_state == ABORTED) ||
- (victim_thd->wsrep_conflict_state == ABORTING))
- {
- WSREP_DEBUG("wsrep_abort_thd called by %llu with victim %llu already "
- "aborted. Ignoring.",
- (bf_thd) ? (long long)bf_thd->real_id : 0,
- (long long)victim_thd->real_id);
- DBUG_RETURN(1);
- }
-
- WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
- (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
- ha_abort_transaction(bf_thd, victim_thd, signal);
+ WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
+ (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
+ mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
+ ha_abort_transaction(bf_thd, victim_thd, signal);
+ mysql_mutex_lock(&victim_thd->LOCK_thd_data);
}
else
{
WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
}
-
+ mysql_mutex_unlock(&victim_thd->LOCK_thd_data);
DBUG_RETURN(1);
}
-extern "C"
-int wsrep_thd_in_locking_session(void *thd_ptr)
+bool wsrep_bf_abort(const THD* bf_thd, THD* victim_thd)
{
- if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
- return 1;
+ WSREP_LOG_THD((THD*)bf_thd, "BF aborter before");
+ WSREP_LOG_THD(victim_thd, "victim before");
+ wsrep::seqno bf_seqno(bf_thd->wsrep_trx().ws_meta().seqno());
+
+ if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active())
+ {
+ WSREP_DEBUG("wsrep_bf_abort, BF abort for non active transaction");
+ wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id());
}
- return 0;
-}
-bool wsrep_thd_has_explicit_locks(THD *thd)
-{
- assert(thd);
- return thd->mdl_context.has_explicit_locks();
+ bool ret;
+ if (wsrep_thd_is_toi(bf_thd))
+ {
+ ret= victim_thd->wsrep_cs().total_order_bf_abort(bf_seqno);
+ }
+ else
+ {
+ ret= victim_thd->wsrep_cs().bf_abort(bf_seqno);
+ }
+ if (ret)
+ {
+ wsrep_bf_aborts_counter++;
+ }
+ return ret;
}
+