summaryrefslogtreecommitdiff
path: root/sql/wsrep_thd.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r--sql/wsrep_thd.cc376
1 files changed, 369 insertions, 7 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index 9515fd550f2..77bd32a97d2 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -22,11 +22,16 @@
//#include "global_threads.h" // LOCK_thread_count, etc.
#include "sql_base.h" // close_thread_tables()
#include "mysqld.h" // start_wsrep_THD();
+#include "sql_show.h" // find_thread_by_id
#include "slave.h" // opt_log_slave_updates
#include "rpl_filter.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
+#include "debug_sync.h"
+#include <list>
+
+extern std::list< wsrep_kill_t > wsrep_kill_list;
#if (__LP64__)
static volatile int64 wsrep_bf_aborts_counter(0);
@@ -280,7 +285,7 @@ void wsrep_replay_sp_transaction(THD* thd)
WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
rcode,
(thd->db.str ? thd->db.str : "(null)"),
- WSREP_QUERY(thd));
+ wsrep_thd_query(thd));
/* we're now in inconsistent state, must abort */
mysql_mutex_unlock(&thd->LOCK_thd_data);
unireg_abort(1);
@@ -528,12 +533,27 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
mysql_mutex_lock(&LOCK_thread_count);
ulong old_wsrep_running_threads= wsrep_running_threads;
+ PSI_thread_key key;
- DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD ||
- args->thread_type == WSREP_ROLLBACKER_THREAD);
+#ifdef HAVE_PSI_INTERFACE
+ switch(args->thread_type)
+ {
+ case WSREP_APPLIER_THREAD:
+ key = key_wsrep_applier;
+ break;
+ case WSREP_ROLLBACKER_THREAD:
+ key = key_wsrep_rollbacker;
+ break;
+ case WSREP_KILLER_THREAD:
+ key = key_wsrep_killer;
+ break;
+ default:
+ WSREP_ERROR("Incorrect thread type %d", args->thread_type);
+ assert(0);
+ }
+#endif /* HAVE_PSI_INTERFACE */
- bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD
- ? key_wsrep_applier : key_wsrep_rollbacker,
+ bool res= mysql_thread_create(key,
&args->thread_id, &connection_attrib,
start_wsrep_THD, (void*)args);
@@ -561,6 +581,348 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
return res;
}
+static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ bf_seqno, victim_seqno);
+ abort();
+}
+
+static int wsrep_kill(wsrep_kill_t* item)
+{
+ bool signal= item->signal;
+ unsigned long long victim_trx_id= static_cast<unsigned long long>(item->victim_trx_id);
+ unsigned long long bf_trx_id= static_cast<unsigned long long>(item->bf_trx_id);
+
+ // Note that find_thread_by_id will acquire LOCK_thd_kill mutex
+ // for thd if it's found
+ THD* bf_thd= find_thread_by_id(item->bf_thd_id, false, false);
+
+ if (!bf_thd)
+ {
+ WSREP_ERROR("BF thread: %lu not found", item->bf_thd_id);
+ assert(0);
+ }
+
+ long long bf_seqno= wsrep_thd_trx_seqno(bf_thd);
+
+ WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s query: %s",
+ wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal",
+ bf_trx_id,
+ item->bf_thd_id,
+ bf_seqno,
+ wsrep_thd_query_state_str(bf_thd),
+ wsrep_thd_conflict_state_str(bf_thd),
+ wsrep_thd_query(bf_thd));
+
+ // Note that we need to release LOCK_thd_kill mutex from BF thread
+ // to obey safe mutex ordering of LOCK_thread_count -> LOCK_thd_data
+ // that both are taken on find_thread_by_id below
+ mysql_mutex_unlock(&bf_thd->LOCK_thd_kill);
+
+ THD* thd= find_thread_by_id(item->victim_thd_id, false, true);
+
+ if (!thd)
+ {
+ WSREP_DEBUG("Victim thread: %lu not found", item->victim_thd_id);
+ return(0);
+ }
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ unsigned long victim_thread= item->victim_thd_id;
+ long long victim_seqno= wsrep_thd_trx_seqno(thd);
+
+ WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s query: %s",
+ wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
+ victim_trx_id,
+ victim_thread,
+ victim_seqno,
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_query(thd));
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING)
+ {
+ WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu"
+ " thread: %lu",
+ victim_trx_id,
+ victim_thread);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ return(0);
+ }
+
+ if (wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ {
+ WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu "
+ ", thread: %lu exec_mode: %s",
+ victim_trx_id,
+ victim_thread,
+ wsrep_thd_exec_mode_str(thd));
+ }
+
+ switch (wsrep_thd_get_conflict_state(thd))
+ {
+ case NO_CONFLICT:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state",
+ victim_thread,
+ victim_trx_id);
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state",
+ victim_thread,
+ victim_trx_id);
+ wsrep_thd_awake(thd, signal);
+ return(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s",
+ victim_thread,
+ victim_trx_id,
+ wsrep_thd_conflict_state_str(thd));
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ return(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd))
+ {
+ case QUERY_COMMITTING:
+ {
+ enum wsrep_status rcode=WSREP_OK;
+
+ WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ }
+ else
+ {
+ wsrep_t *wsrep= get_wsrep();
+
+ rcode= wsrep->abort_pre_commit(wsrep, bf_seqno,
+ (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id);
+
+ switch (rcode)
+ {
+ case WSREP_WARNING:
+ {
+ WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ wsrep_thd_awake(thd, signal);
+ return(1);
+ break;
+ }
+ case WSREP_OK:
+ break;
+ default:
+ {
+ WSREP_ERROR("Victim cancel commit bad commit exit thread: "
+ "%lu trx: %llu rcode: %d ",
+ victim_thread,
+ victim_trx_id,
+ rcode);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first. */
+ abort();
+ break;
+ }
+ }
+ }
+
+ wsrep_thd_awake(thd, signal);
+ break;
+ }
+ case QUERY_EXEC:
+ {
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting */
+ WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu",
+ victim_thread, victim_trx_id);
+
+ bool wait_lock= item->wait_lock;
+
+ if (wait_lock)
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag",
+ victim_thread,
+ victim_trx_id);
+
+ wsrep_thd_awake(thd, signal);
+ }
+ else
+ {
+ /* Abort currently executing query */
+ WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ wsrep_thd_awake(thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave for thread: "
+ "%lu trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ }
+ }
+ break;
+ }
+ case QUERY_IDLE:
+ {
+ WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: "
+ "%llu bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_trx_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ return(0);
+ }
+
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ wsrep_lock_rollback();
+
+ if (wsrep_aborting_thd_contains(thd))
+ {
+ WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu",
+ victim_thread,
+ victim_trx_id);
+ }
+ else
+ {
+ wsrep_aborting_thd_enqueue(thd);
+ WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort",
+ victim_thread,
+ victim_trx_id);
+ }
+
+ wsrep_unlock_rollback();
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+
+ break;
+ }
+ default:
+ {
+ WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s",
+ victim_thread,
+ victim_trx_id,
+ wsrep_thd_query_state_str(thd));
+
+ assert(0);
+ break;
+ }
+ }
+
+ return(0);
+}
+
+static void wsrep_process_kill(THD *thd)
+{
+ DBUG_ENTER("wsrep_process_kill");
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ WSREP_DEBUG("WSREP killer thread started");
+
+ while (thd->killed == NOT_KILLED)
+ {
+ thd_proc_info(thd, "wsrep killer idle");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_kill;
+ thd->mysys_var->current_cond= &COND_wsrep_kill;
+
+ mysql_cond_wait(&COND_wsrep_kill,&LOCK_wsrep_kill);
+
+ WSREP_DEBUG("WSREP killer thread wakes for signal");
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep killer active");
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ /* process all entries in the queue */
+ while (!wsrep_kill_list.empty())
+ {
+ wsrep_kill_t to_be_killed= wsrep_kill_list.front();
+ // Release list mutex while we kill one thread
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ wsrep_kill(&to_be_killed);
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+ wsrep_kill_list.pop_front();
+ }
+ }
+
+ assert(wsrep_kill_list.empty());
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ sql_print_information("WSREP: killer thread exiting");
+ DBUG_PRINT("wsrep",("wsrep killer thread exiting"));
+ DBUG_VOID_RETURN;
+}
+
+
+void wsrep_create_killer()
+{
+ wsrep_thread_args* arg;
+ if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
+ WSREP_ERROR("Can't allocate memory for wsrep background killer thread");
+ assert(0);
+ }
+
+ arg->thread_type = WSREP_KILLER_THREAD;
+ arg->processor = wsrep_process_kill;
+
+ if (create_wsrep_THD(arg, false)) {
+ WSREP_WARN("Can't create thread to manage wsrep background kill");
+ my_free(arg);
+ return;
+ }
+}
+
bool wsrep_create_appliers(long threads, bool thread_count_lock)
{
if (!wsrep_connected)
@@ -656,7 +1018,7 @@ static void wsrep_rollback_process(THD *thd)
mysql_mutex_unlock(&aborting->LOCK_thd_data);
- set_current_thd(aborting);
+ set_current_thd(aborting);
aborting->store_globals();
mysql_mutex_lock(&aborting->LOCK_thd_data);
@@ -666,7 +1028,7 @@ static void wsrep_rollback_process(THD *thd)
(longlong) aborting->real_id);
mysql_mutex_unlock(&aborting->LOCK_thd_data);
- set_current_thd(thd);
+ set_current_thd(thd);
thd->store_globals();
mysql_mutex_lock(&LOCK_wsrep_rollback);