diff options
Diffstat (limited to 'sql/handler.cc')
| -rw-r--r-- | sql/handler.cc | 339 |
1 files changed, 250 insertions, 89 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index 001055cd475..1b5aaebe3cf 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -54,8 +54,12 @@ #include "semisync_master.h" #include "wsrep_mysqld.h" -#include "wsrep.h" +#ifdef WITH_WSREP +#include "wsrep_binlog.h" #include "wsrep_xid.h" +#include "wsrep_thd.h" +#include "wsrep_trans_observer.h" /* wsrep transaction hooks */ +#endif /* WITH_WSREP */ /* While we have legacy_db_type, we have this array to @@ -251,6 +255,9 @@ handlerton *ha_checktype(THD *thd, handlerton *hton, bool no_substitute) if (no_substitute) return NULL; +#ifdef WITH_WSREP + (void)wsrep_after_rollback(thd, false); +#endif /* WITH_WSREP */ return ha_default_handlerton(thd); } /* ha_checktype */ @@ -1199,17 +1206,28 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg) static int prepare_or_error(handlerton *ht, THD *thd, bool all) { + #ifdef WITH_WSREP + if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION && + wsrep_before_prepare(thd, all)) + { + return(1); + } +#endif /* WITH_WSREP */ + int err= ht->prepare(ht, thd, all); status_var_increment(thd->status_var.ha_prepare_count); if (err) { - /* avoid sending error, if we're going to replay the transaction */ -#ifdef WITH_WSREP - if (ht != wsrep_hton || - err == EMSGSIZE || thd->wsrep_conflict_state != MUST_REPLAY) -#endif my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); } +#ifdef WITH_WSREP + if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION && + wsrep_after_prepare(thd, all)) + { + err= 1; + } +#endif /* WITH_WSREP */ + return err; } @@ -1394,7 +1412,7 @@ int ha_commit_trans(THD *thd, bool all) } #ifdef WITH_ARIA_STORAGE_ENGINE - ha_maria::implicit_commit(thd, TRUE); + ha_maria::implicit_commit(thd, TRUE); #endif if (!ha_info) @@ -1404,6 +1422,12 @@ int ha_commit_trans(THD *thd, bool all) */ if (is_real_trans) thd->transaction.cleanup(); +#ifdef WITH_WSREP + if (WSREP(thd) && all && !error) + { + wsrep_commit_empty(thd, all); + } +#endif /* WITH_WSREP */ DBUG_RETURN(0); } @@ -1489,7 +1513,28 @@ int ha_commit_trans(THD *thd, bool all) if (trans->no_2pc || (rw_ha_count <= 1)) { +#ifdef WITH_WSREP + /* + This commit will not go through log_and_order() where wsrep commit + ordering is normally done. Commit ordering must be done here. + */ + bool run_wsrep_commit= (WSREP(thd) && + rw_ha_count && + wsrep_thd_is_local(thd) && + wsrep_has_changes(thd, all)); + if (run_wsrep_commit) + error= wsrep_before_commit(thd, all); + if (error) + { + ha_rollback_trans(thd, FALSE); + goto wsrep_err; + } +#endif /* WITH_WSREP */ error= ha_commit_one_phase(thd, all); +#ifdef WITH_WSREP + if (run_wsrep_commit) + error= wsrep_after_commit(thd, all); +#endif /* WITH_WSREP */ goto done; } @@ -1521,10 +1566,14 @@ int ha_commit_trans(THD *thd, bool all) DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); #ifdef WITH_WSREP - if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid)) + if (!error && WSREP_ON) { - // xid was rewritten by wsrep - xid= wsrep_xid_seqno(thd->transaction.xid_state.xid); + wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid); + if (!s.is_undefined()) + { + // xid was rewritten by wsrep + xid= s.get(); + } } #endif /* WITH_WSREP */ @@ -1533,18 +1582,35 @@ int ha_commit_trans(THD *thd, bool all) error= commit_one_phase_2(thd, all, trans, is_real_trans); goto done; } - +#ifdef WITH_WSREP + if (wsrep_before_commit(thd, all)) + goto wsrep_err; +#endif /* WITH_WSREP */ DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order"); cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered, need_commit_ordered); if (!cookie) + { + WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie); goto err; - + } DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order"); DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0; - +#ifdef WITH_WSREP + if (error || wsrep_after_commit(thd, all)) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort) + { + mysql_mutex_unlock(&thd->LOCK_thd_data); + (void)tc_log->unlog(cookie, xid); + goto wsrep_err; + } + mysql_mutex_unlock(&thd->LOCK_thd_data); + } +#endif /* WITH_WSREP */ DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); if (tc_log->unlog(cookie, xid)) { @@ -1566,6 +1632,19 @@ done: goto end; /* Come here if error and we need to rollback. */ +#ifdef WITH_WSREP +wsrep_err: + mysql_mutex_lock(&thd->LOCK_thd_data); + if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort) + { + WSREP_DEBUG("BF abort has happened after prepare & certify"); + mysql_mutex_unlock(&thd->LOCK_thd_data); + ha_rollback_trans(thd, TRUE); + } + else + mysql_mutex_unlock(&thd->LOCK_thd_data); + +#endif /* WITH_WSREP */ err: error= 1; /* Transaction was rolled back */ /* @@ -1575,7 +1654,11 @@ err: */ if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec)) ha_rollback_trans(thd, all); - + else + { + WSREP_DEBUG("rollback skipped %p %d",thd->rgi_slave, + thd->rgi_slave->is_parallel_exec); + } end: if (rw_trans && mdl_request.ticket) { @@ -1587,6 +1670,13 @@ end: */ thd->mdl_context.release_lock(mdl_request.ticket); } +#ifdef WITH_WSREP + if (WSREP(thd) && all && !error && (rw_ha_count == 0)) + { + wsrep_commit_empty(thd, all); + } +#endif /* WITH_WSREP */ + DBUG_RETURN(error); } @@ -1744,6 +1834,9 @@ int ha_rollback_trans(THD *thd, bool all) DBUG_RETURN(1); } +#ifdef WITH_WSREP + (void) wsrep_before_rollback(thd, all); +#endif /* WITH_WSREP */ if (ha_info) { /* Close all cursors that can not survive ROLLBACK */ @@ -1759,9 +1852,9 @@ int ha_rollback_trans(THD *thd, bool all) my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); error=1; #ifdef WITH_WSREP - WSREP_WARN("handlerton rollback failed, thd %llu %lld conf %d SQL %s", - thd->thread_id, thd->query_id, thd->wsrep_conflict_state, - thd->query()); + WSREP_WARN("handlerton rollback failed, thd %lld %lld conf %d SQL %s", + thd->thread_id, thd->query_id, thd->wsrep_trx().state(), + thd->query()); #endif /* WITH_WSREP */ } status_var_increment(thd->status_var.ha_rollback_count); @@ -1780,6 +1873,15 @@ int ha_rollback_trans(THD *thd, bool all) thd->transaction.xid_state.xa_state != XA_NOTR) thd->transaction.xid_state.rm_error= thd->get_stmt_da()->sql_errno(); +#ifdef WITH_WSREP + if (thd->is_error()) + { + WSREP_DEBUG("ha_rollback_trans(%lld, %s) rolled back: %s: %s; is_real %d", + thd->thread_id, all?"TRUE":"FALSE", WSREP_QUERY(thd), + thd->get_stmt_da()->message(), is_real_trans); + } + (void) wsrep_after_rollback(thd, all); +#endif /* WITH_WSREP */ /* Always cleanup. Even if nht==0. There may be savepoints. */ if (is_real_trans) { @@ -1913,6 +2015,28 @@ static char* xid_to_str(char *buf, XID *xid) } #endif +#ifdef WITH_WSREP +static my_xid wsrep_order_and_check_continuity(XID *list, int len) +{ + wsrep_sort_xid_array(list, len); + wsrep::gtid cur_position= wsrep_get_SE_checkpoint(); + long long cur_seqno= cur_position.seqno().get(); + for (int i= 0; i < len; ++i) + { + if (!wsrep_is_wsrep_xid(list + i) || + wsrep_xid_seqno(list + i) != cur_seqno + 1) + { + WSREP_WARN("Discovered discontinuity in recovered wsrep " + "transaction XIDs. Truncating the recovery list to " + "%d entries", i); + break; + } + ++cur_seqno; + } + WSREP_INFO("Last wsrep seqno to be recovered %lld", cur_seqno); + return (cur_seqno < 0 ? 0 : cur_seqno); +} +#endif /* WITH_WSREP */ /** recover() step of xa. @@ -1950,10 +2074,32 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, { sql_print_information("Found %d prepared transaction(s) in %s", got, hton_name(hton)->str); +#ifdef WITH_WSREP + /* If wsrep_on=ON, XIDs are first ordered and then the range of + recovered XIDs is checked for continuity. All the XIDs which + are in continuous range can be safely committed if binlog + is off since they have already ordered and certified in the + cluster. + + The discontinuity of wsrep XIDs may happen because the GTID + is assigned for transaction in wsrep_before_prepare(), but the + commit order is entered in wsrep_before_commit(). This means that + transactions may run prepare step out of order and may + result in gap in wsrep XIDs. This can be the case for example + if we have T1 with seqno 1 and T2 with seqno 2 and the server + crashes after T2 finishes prepare step but before T1 starts + the prepare. + */ + my_xid wsrep_limit= 0; + if (WSREP_ON) + { + wsrep_limit= wsrep_order_and_check_continuity(info->list, got); + } +#endif /* WITH_WSREP */ for (int i=0; i < got; i ++) { my_xid x= IF_WSREP(WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ? - wsrep_xid_seqno(info->list[i]) : + wsrep_xid_seqno(&info->list[i]) : info->list[i].get_my_xid(), info->list[i].get_my_xid()); if (!x) // not "mine" - that is generated by external TM @@ -1972,9 +2118,12 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, continue; } // recovery mode - if (info->commit_list ? - my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 : - tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT) + if (IF_WSREP((wsrep_emulate_bin_log && + wsrep_is_wsrep_xid(info->list + i) && + x <= wsrep_limit), false) || + (info->commit_list ? + my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 : + tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)) { #ifndef DBUG_OFF int rc= @@ -2332,11 +2481,26 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv) { int err; handlerton *ht= ha_info->ht(); +#ifdef WITH_WSREP + if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION) + { + WSREP_DEBUG("ha_rollback_to_savepoint: run before_rollbackha_rollback_trans hook"); + (void) wsrep_before_rollback(thd, !thd->in_sub_stmt); + + } +#endif // WITH_WSREP if ((err= ht->rollback(ht, thd, !thd->in_sub_stmt))) { // cannot happen my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); error=1; } +#ifdef WITH_WSREP + if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION) + { + WSREP_DEBUG("ha_rollback_to_savepoint: run after_rollback hook"); + (void) wsrep_after_rollback(thd, !thd->in_sub_stmt); + } +#endif // WITH_WSREP status_var_increment(thd->status_var.ha_rollback_count); ha_info_next= ha_info->next(); ha_info->reset(); /* keep it conveniently zero-filled */ @@ -2353,6 +2517,16 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv) */ int ha_savepoint(THD *thd, SAVEPOINT *sv) { +#ifdef WITH_WSREP + /* + Register binlog hton for savepoint processing if wsrep binlog + emulation is on. + */ + if (WSREP_EMULATE_BINLOG(thd) && wsrep_thd_is_local(thd)) + { + wsrep_register_binlog_handler(thd, thd->in_multi_stmt_transaction_mode()); + } +#endif /* WITH_WSREP */ int error=0; THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt : &thd->transaction.all); @@ -5968,6 +6142,12 @@ bool handler::check_table_binlog_row_based(bool binlog_row) return false; if (unlikely((table->in_use->variables.sql_log_bin_off))) return 0; /* Called by partitioning engine */ +#ifdef WITH_WSREP + if (!table->in_use->variables.sql_log_bin && + wsrep_thd_is_applying(table->in_use)) + return 0; /* wsrep patch sets sql_log_bin to silence binlogging + from high priority threads */ +#endif /* WITH_WSREP */ if (unlikely((!check_table_binlog_row_based_done))) { check_table_binlog_row_based_done= 1; @@ -5998,12 +6178,12 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row) Otherwise, return 'true' if binary logging is on. */ IF_WSREP(((WSREP_EMULATE_BINLOG(thd) && - (thd->wsrep_exec_mode != REPL_RECV)) || + wsrep_thd_is_local(thd)) || ((WSREP(thd) || (thd->variables.option_bits & OPTION_BIN_LOG)) && mysql_bin_log.is_open())), - (thd->variables.option_bits & OPTION_BIN_LOG) && - mysql_bin_log.is_open())); + (thd->variables.option_bits & OPTION_BIN_LOG) && + mysql_bin_log.is_open())); } @@ -6128,23 +6308,9 @@ int binlog_log_row(TABLE* table, const uchar *before_record, /* only InnoDB tables will be replicated through binlog emulation */ if ((WSREP_EMULATE_BINLOG(thd) && - table->file->partition_ht()->db_type != DB_TYPE_INNODB) || - (thd->wsrep_ignore_table == true)) + !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) || + thd->wsrep_ignore_table == true) return 0; - - /* enforce wsrep_max_ws_rows */ - if (WSREP(thd) && table->s->tmp_table == NO_TMP_TABLE) - { - thd->wsrep_affected_rows++; - if (wsrep_max_ws_rows && - thd->wsrep_exec_mode != REPL_RECV && - thd->wsrep_affected_rows > wsrep_max_ws_rows) - { - trans_rollback_stmt(thd) || trans_rollback(thd); - my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); - return ER_ERROR_DURING_COMMIT; - } - } #endif if (!table->file->check_table_binlog_row_based(1)) @@ -6256,6 +6422,27 @@ int handler::ha_reset() DBUG_RETURN(reset()); } +#ifdef WITH_WSREP +static int wsrep_after_row(THD *thd) +{ + DBUG_ENTER("wsrep_after_row"); + /* enforce wsrep_max_ws_rows */ + thd->wsrep_affected_rows++; + if (wsrep_max_ws_rows && + wsrep_thd_is_local(thd) && + thd->wsrep_affected_rows > wsrep_max_ws_rows) + { + trans_rollback_stmt(thd) || trans_rollback(thd); + my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0)); + DBUG_RETURN(ER_ERROR_DURING_COMMIT); + } + else if (wsrep_after_row(thd, false)) + { + DBUG_RETURN(ER_LOCK_DEADLOCK); + } + DBUG_RETURN(0); +} +#endif /* WITH_WSREP */ int handler::ha_write_row(uchar *buf) { @@ -6278,7 +6465,15 @@ int handler::ha_write_row(uchar *buf) { rows_changed++; error= binlog_log_row(table, 0, buf, log_func); +#ifdef WITH_WSREP + if (table_share->tmp_table == NO_TMP_TABLE && + WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + { + DBUG_RETURN(error); + } +#endif /* WITH_WSREP */ } + DEBUG_SYNC_C("ha_write_row_end"); DBUG_RETURN(error); } @@ -6310,6 +6505,13 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data) { rows_changed++; error= binlog_log_row(table, old_data, new_data, log_func); +#ifdef WITH_WSREP + if (table_share->tmp_table == NO_TMP_TABLE && + WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + { + return error; + } +#endif /* WITH_WSREP */ } return error; } @@ -6365,6 +6567,13 @@ int handler::ha_delete_row(const uchar *buf) { rows_changed++; error= binlog_log_row(table, buf, 0, log_func); +#ifdef WITH_WSREP + if (table_share->tmp_table == NO_TMP_TABLE && + WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd()))) + { + return error; + } +#endif /* WITH_WSREP */ } return error; } @@ -6554,7 +6763,7 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal) DBUG_ENTER("ha_abort_transaction"); if (!WSREP(bf_thd) && !(bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU && - bf_thd->wsrep_exec_mode == TOTAL_ORDER)) { + wsrep_thd_is_toi(bf_thd))) { DBUG_RETURN(0); } @@ -6570,54 +6779,6 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal) DBUG_RETURN(0); } - -void ha_fake_trx_id(THD *thd) -{ - DBUG_ENTER("ha_fake_trx_id"); - - bool no_fake_trx_id= true; - - if (!WSREP(thd)) - { - DBUG_VOID_RETURN; - } - - if (thd->wsrep_ws_handle.trx_id != WSREP_UNDEFINED_TRX_ID) - { - WSREP_DEBUG("fake trx id skipped: %" PRIu64, thd->wsrep_ws_handle.trx_id); - DBUG_VOID_RETURN; - } - - /* Try statement transaction if standard one is not set. */ - THD_TRANS *trans= (thd->transaction.all.ha_list) ? &thd->transaction.all : - &thd->transaction.stmt; - - Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; - - for (; ha_info; ha_info= ha_info_next) - { - handlerton *hton= ha_info->ht(); - if (hton->fake_trx_id) - { - hton->fake_trx_id(hton, thd); - - /* Got a fake trx id. */ - no_fake_trx_id= false; - - /* - We need transaction ID from just one storage engine providing - fake_trx_id (which will most likely be the case). - */ - break; - } - ha_info_next= ha_info->next(); - } - - if (unlikely(no_fake_trx_id)) - WSREP_WARN("Cannot get fake transaction ID from storage engine."); - - DBUG_VOID_RETURN; -} #endif /* WITH_WSREP */ |
