diff options
Diffstat (limited to 'sql/handler.cc')
-rw-r--r-- | sql/handler.cc | 216 |
1 files changed, 155 insertions, 61 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index b3481c7e429..cb3f156be81 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -55,6 +55,12 @@ #include "wsrep_mysqld.h" #include "wsrep.h" #include "wsrep_xid.h" +#ifdef WITH_WSREP +#include "wsrep_sr.h" +#endif +#include "wsrep_thd.h" +#include "wsrep_trans_observer.h" /* wsrep transaction hooks */ +#include "log.h" /* While we have legacy_db_type, we have this array to @@ -1168,8 +1174,8 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all) { /* avoid sending error, if we're going to replay the transaction */ #ifdef WITH_WSREP - if (ht != wsrep_hton || - err == EMSGSIZE || thd->wsrep_conflict_state != MUST_REPLAY) + if (ht->db_type != DB_TYPE_UNKNOWN || + err == EMSGSIZE || thd->wsrep_conflict_state() != MUST_REPLAY) #endif my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); } @@ -1248,6 +1254,18 @@ ha_check_and_coalesce_trx_read_only(THD *thd, Ha_trx_info *ha_list, for (ha_info= ha_list; ha_info; ha_info= ha_info->next()) { +#ifdef WITH_WSREP + /* + Thd has wsrep_schema.SR open and may operate it + during prepare phase, set InnoDB ha_info read_write. + */ + if (WSREP_CLIENT(thd) && thd->wsrep_is_streaming() && + wsrep_SR_store && wsrep_SR_store_type == WSREP_SR_STORE_TABLE && + ha_info->ht()->db_type == DB_TYPE_INNODB) + { + ha_info->set_trx_read_write(); + } +#endif /* WITH_WSREP */ if (ha_info->is_trx_read_write()) ++rw_ha_count; @@ -1460,6 +1478,13 @@ int ha_commit_trans(THD *thd, bool all) need_prepare_ordered= FALSE; need_commit_ordered= FALSE; xid= thd->transaction.xid_state.xid.get_my_xid(); +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_before_prepare(thd, all)) + { + wsrep_override_error(thd, ER_ERROR_DURING_COMMIT); + goto wsrep_err; + } +#endif /* WITH_WSREP */ for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) { @@ -1483,6 +1508,13 @@ int ha_commit_trans(THD *thd, bool all) } DEBUG_SYNC(thd, "ha_commit_trans_after_prepare"); DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_after_prepare(thd, all)) + { + wsrep_override_error(thd, ER_ERROR_DURING_COMMIT); + goto err; + } +#endif /* WITH_WSREP */ #ifdef WITH_WSREP if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid)) @@ -1502,13 +1534,29 @@ int ha_commit_trans(THD *thd, bool all) cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered, need_commit_ordered); if (!cookie) + { + WSREP_DEBUG("log_and_order has failed %lu %d", thd->thread_id, cookie); goto err; + } DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order"); DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0; - +#ifdef WITH_WSREP + if (error) + { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_exec_mode == LOCAL_COMMIT && + thd->wsrep_conflict_state() == MUST_ABORT) + { + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + (void)tc_log->unlog(cookie, xid); + goto wsrep_err; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); +} +#endif /* WITH_WSREP */ DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); if (tc_log->unlog(cookie, xid)) { @@ -1530,6 +1578,20 @@ done: goto end; /* Come here if error and we need to rollback. */ +#ifdef WITH_WSREP + wsrep_err: + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_exec_mode == LOCAL_COMMIT && + thd->wsrep_conflict_state() == MUST_ABORT) + { + WSREP_DEBUG("BF abort has happened after prepare & certify"); + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + ha_rollback_trans(thd, TRUE); + } + else + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + +#endif /* WITH_WSREP */ err: error= 1; /* Transaction was rolled back */ /* @@ -1539,7 +1601,10 @@ err: */ if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec)) ha_rollback_trans(thd, all); - + else + { + WSREP_DEBUG("WTF, rollback skipped %d %d",thd->rgi_slave, thd->rgi_slave->is_parallel_exec); + } end: if (rw_trans && mdl_request.ticket) { @@ -1673,7 +1738,6 @@ int ha_rollback_trans(THD *thd, bool all) */ DBUG_ASSERT(thd->transaction.stmt.ha_list == NULL || trans == &thd->transaction.stmt); - #ifdef HAVE_REPLICATION if (is_real_trans) { @@ -1708,6 +1772,9 @@ int ha_rollback_trans(THD *thd, bool all) DBUG_RETURN(1); } +#ifdef WITH_WSREP + (void) wsrep_before_rollback(thd, all); +#endif // WITH_WSREP if (ha_info) { /* Close all cursors that can not survive ROLLBACK */ @@ -1723,9 +1790,9 @@ int ha_rollback_trans(THD *thd, bool all) my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); error=1; #ifdef WITH_WSREP - WSREP_WARN("handlerton rollback failed, thd %llu %lld conf %d SQL %s", - thd->thread_id, thd->query_id, thd->wsrep_conflict_state, - thd->query()); + WSREP_WARN("handlerton rollback failed, thd %lld %lld conf %d SQL %s", + thd->thread_id, thd->query_id, thd->wsrep_conflict_state(), + thd->query()); #endif /* WITH_WSREP */ } status_var_increment(thd->status_var.ha_rollback_count); @@ -1744,6 +1811,15 @@ int ha_rollback_trans(THD *thd, bool all) thd->transaction.xid_state.xa_state != XA_NOTR) thd->transaction.xid_state.rm_error= thd->get_stmt_da()->sql_errno(); +#ifdef WITH_WSREP + if (thd->is_error()) + { + WSREP_DEBUG("ha_rollback_trans(%lld, %s) rolled back: %s: %s; is_real %d", + thd->thread_id, all?"TRUE":"FALSE", WSREP_QUERY(thd), + thd->get_stmt_da()->message(), is_real_trans); + } + (void) wsrep_after_rollback(thd, all); +#endif /* Always cleanup. Even if nht==0. There may be savepoints. */ if (is_real_trans) { @@ -2296,6 +2372,14 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv) { int err; handlerton *ht= ha_info->ht(); +#ifdef WITH_WSREP + if (ht->db_type == DB_TYPE_INNODB) + { + WSREP_DEBUG("ha_rollback_to_savepoint: run before_rollbackha_rollback_trans hook"); + (void) wsrep_before_rollback(thd, !thd->in_sub_stmt); + + } +#endif // WITH_WSREP if ((err= ht->rollback(ht, thd, !thd->in_sub_stmt))) { // cannot happen my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); @@ -6021,6 +6105,16 @@ static int binlog_log_row_internal(TABLE* table, bool error= 0; THD *const thd= table->in_use; +#ifdef WITH_WSREP + /* only InnoDB tables will be replicated through binlog emulation */ + if (WSREP_EMULATE_BINLOG(thd) && + table->file->ht->db_type != DB_TYPE_INNODB && + !(table->file->ht->db_type == DB_TYPE_PARTITION_DB && + (((ha_partition*)(table->file))->wsrep_db_type() == DB_TYPE_INNODB))) + { + return 0; + } +#endif /* WITH_WSREP */ /* If there are no table maps written to the binary log, this is the first row handled in this statement. In that case, we need @@ -6055,20 +6149,6 @@ int binlog_log_row(TABLE* table, const uchar *before_record, table->file->partition_ht()->db_type != DB_TYPE_INNODB) || (thd->wsrep_ignore_table == true)) return 0; - - /* enforce wsrep_max_ws_rows */ - if (WSREP(thd) && table->s->tmp_table == NO_TMP_TABLE) - { - thd->wsrep_affected_rows++; - if (wsrep_max_ws_rows && - thd->wsrep_exec_mode != REPL_RECV && - thd->wsrep_affected_rows > wsrep_max_ws_rows) - { - trans_rollback_stmt(thd) || trans_rollback(thd); - my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); - return ER_ERROR_DURING_COMMIT; - } - } #endif if (!table->file->check_table_binlog_row_based(1)) @@ -6180,6 +6260,31 @@ int handler::ha_reset() DBUG_RETURN(reset()); } +#ifdef WITH_WSREP +static int wsrep_after_row(THD *thd) +{ + DBUG_ENTER("wsrep_after_row"); + /* enforce wsrep_max_ws_rows */ + thd->wsrep_affected_rows++; + if (wsrep_max_ws_rows && + thd->wsrep_exec_mode != REPL_RECV && + thd->wsrep_affected_rows > wsrep_max_ws_rows) + { + trans_rollback_stmt(thd) || trans_rollback(thd); + my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); + DBUG_RETURN(ER_ERROR_DURING_COMMIT); + } + else if (wsrep_after_row(thd, false)) + { + if (!thd->get_stmt_da()->is_error()) + { + wsrep_override_error(thd, ER_LOCK_DEADLOCK); + } + DBUG_RETURN(ER_LOCK_DEADLOCK); + } + DBUG_RETURN(0); +} +#endif /* WITH_WSREP */ int handler::ha_write_row(uchar *buf) { @@ -6202,7 +6307,16 @@ int handler::ha_write_row(uchar *buf) { rows_changed++; error= binlog_log_row(table, 0, buf, log_func); +#ifdef WITH_WSREP + THD *thd= current_thd; + if (table_share->tmp_table == NO_TMP_TABLE && + WSREP(thd) && (error= wsrep_after_row(thd))) + { + DBUG_RETURN(error); + } +#endif /* WITH_WSREP */ } + DEBUG_SYNC_C("ha_write_row_end"); DBUG_RETURN(error); } @@ -6234,6 +6348,14 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data) { rows_changed++; error= binlog_log_row(table, old_data, new_data, log_func); +#ifdef WITH_WSREP + THD *thd= current_thd; + if (table_share->tmp_table == NO_TMP_TABLE && + WSREP(thd) && (error= wsrep_after_row(thd))) + { + return error; + } +#endif /* WITH_WSREP */ } return error; } @@ -6289,6 +6411,14 @@ int handler::ha_delete_row(const uchar *buf) { rows_changed++; error= binlog_log_row(table, buf, 0, log_func); +#ifdef WITH_WSREP + THD *thd= current_thd; + if (table_share->tmp_table == NO_TMP_TABLE && + WSREP(thd) && (error= wsrep_after_row(thd))) + { + return error; + } +#endif /* WITH_WSREP */ } return error; } @@ -6494,51 +6624,15 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal) DBUG_RETURN(0); } - void ha_fake_trx_id(THD *thd) { - DBUG_ENTER("ha_fake_trx_id"); - - bool no_fake_trx_id= true; - - if (!WSREP(thd)) - { - DBUG_VOID_RETURN; - } - - if (thd->wsrep_ws_handle.trx_id != WSREP_UNDEFINED_TRX_ID) + DBUG_ENTER("ha_wsrep_fake_trx_id"); + if (!WSREP(thd)) { - WSREP_DEBUG("fake trx id skipped: %lu", thd->wsrep_ws_handle.trx_id); DBUG_VOID_RETURN; } - /* Try statement transaction if standard one is not set. */ - THD_TRANS *trans= (thd->transaction.all.ha_list) ? &thd->transaction.all : - &thd->transaction.stmt; - - Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; - - for (; ha_info; ha_info= ha_info_next) - { - handlerton *hton= ha_info->ht(); - if (hton->fake_trx_id) - { - hton->fake_trx_id(hton, thd); - - /* Got a fake trx id. */ - no_fake_trx_id= false; - - /* - We need transaction ID from just one storage engine providing - fake_trx_id (which will most likely be the case). - */ - break; - } - ha_info_next= ha_info->next(); - } - - if (unlikely(no_fake_trx_id)) - WSREP_WARN("Cannot get fake transaction ID from storage engine."); + (void *)wsrep_ws_handle_for_trx(&thd->wsrep_ws_handle, thd->query_id); DBUG_VOID_RETURN; } |