From a8ff4f243da2114a03aabc6513dc62acd308c0e3 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 21 Feb 2019 21:57:52 +0200 Subject: MDEV-18631 Fix streaming replication with wsrep_gtid_mode=ON With wsrep_gtid_mode=ON, the appropriate commit hooks were not called in all cases for applied streaming transactions. As a fix, removed all special handling of commit order critical section from Wsrep_high_priority_service and Wsrep_storage_service. Now commit order critical section is always entered in ha_commit_trans(). Check for wsrep_run_commit_hook is now done in handler.cc, log.cc. This makes it explicit that the transaction is an active wsrep transaction which must go through commit hooks. --- sql/handler.cc | 35 +++++----- sql/log.cc | 2 +- sql/wsrep_high_priority_service.cc | 81 ++++++++++------------ sql/wsrep_high_priority_service.h | 2 +- sql/wsrep_storage_service.cc | 31 +-------- sql/wsrep_trans_observer.h | 133 ++++++++++++++++++++++++++++--------- 6 files changed, 160 insertions(+), 124 deletions(-) (limited to 'sql') diff --git a/sql/handler.cc b/sql/handler.cc index 7d2731b9a8f..58731fdcda7 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1207,8 +1207,9 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg) static int prepare_or_error(handlerton *ht, THD *thd, bool all) { - #ifdef WITH_WSREP - if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION && +#ifdef WITH_WSREP + const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all); + if (run_wsrep_hooks && ht->flags & HTON_WSREP_REPLICATION && wsrep_before_prepare(thd, all)) { return(1); @@ -1222,7 +1223,7 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all) my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); } #ifdef WITH_WSREP - if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION && + if (run_wsrep_hooks && !err && ht->flags & HTON_WSREP_REPLICATION && wsrep_after_prepare(thd, all)) { err= 1; @@ -1369,6 +1370,9 @@ int ha_commit_trans(THD *thd, bool all) Ha_trx_info *ha_info= trans->ha_list; bool need_prepare_ordered, need_commit_ordered; my_xid xid; +#ifdef WITH_WSREP + const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all); +#endif /* WITH_WSREP */ DBUG_ENTER("ha_commit_trans"); DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d", thd, (ulong) thd->variables.option_bits, all)); @@ -1424,7 +1428,7 @@ int ha_commit_trans(THD *thd, bool all) if (is_real_trans) thd->transaction.cleanup(); #ifdef WITH_WSREP - if (WSREP(thd) && all && !error) + if (wsrep_is_active(thd) && is_real_trans && !error) { wsrep_commit_empty(thd, all); } @@ -1519,11 +1523,7 @@ int ha_commit_trans(THD *thd, bool all) This commit will not go through log_and_order() where wsrep commit ordering is normally done. Commit ordering must be done here. */ - bool run_wsrep_commit= (WSREP(thd) && - rw_ha_count && - wsrep_thd_is_local(thd) && - wsrep_has_changes(thd, all)); - if (run_wsrep_commit) + if (run_wsrep_hooks) error= wsrep_before_commit(thd, all); if (error) { @@ -1533,8 +1533,8 @@ int ha_commit_trans(THD *thd, bool all) #endif /* WITH_WSREP */ error= ha_commit_one_phase(thd, all); #ifdef WITH_WSREP - if (run_wsrep_commit) - error= wsrep_after_commit(thd, all); + if (run_wsrep_hooks) + error= error || wsrep_after_commit(thd, all); #endif /* WITH_WSREP */ goto done; } @@ -1567,7 +1567,7 @@ int ha_commit_trans(THD *thd, bool all) DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); #ifdef WITH_WSREP - if (!error && WSREP_ON) + if (run_wsrep_hooks && !error) { wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid); if (!s.is_undefined()) @@ -1584,7 +1584,7 @@ int ha_commit_trans(THD *thd, bool all) goto done; } #ifdef WITH_WSREP - if (wsrep_before_commit(thd, all)) + if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all))) goto wsrep_err; #endif /* WITH_WSREP */ DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order"); @@ -1600,10 +1600,10 @@ int ha_commit_trans(THD *thd, bool all) error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0; #ifdef WITH_WSREP - if (error || wsrep_after_commit(thd, all)) + if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all)))) { mysql_mutex_lock(&thd->LOCK_thd_data); - if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort) + if (wsrep_must_abort(thd)) { mysql_mutex_unlock(&thd->LOCK_thd_data); (void)tc_log->unlog(cookie, xid); @@ -1636,7 +1636,7 @@ done: #ifdef WITH_WSREP wsrep_err: mysql_mutex_lock(&thd->LOCK_thd_data); - if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort) + if (run_wsrep_hooks && wsrep_must_abort(thd)) { WSREP_DEBUG("BF abort has happened after prepare & certify"); mysql_mutex_unlock(&thd->LOCK_thd_data); @@ -1672,7 +1672,8 @@ end: thd->mdl_context.release_lock(mdl_request.ticket); } #ifdef WITH_WSREP - if (WSREP(thd) && all && !error && (rw_ha_count == 0)) + if (wsrep_is_active(thd) && is_real_trans && !error && (rw_ha_count == 0) && + wsrep_not_committed(thd)) { wsrep_commit_empty(thd, all); } diff --git a/sql/log.cc b/sql/log.cc index a89be15a0d7..c7aa72f9dd0 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -7686,7 +7686,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { int is_leader= queue_for_group_commit(entry); #ifdef WITH_WSREP - if (is_leader >= 0 && + if (wsrep_run_commit_hook(entry->thd, true) && is_leader >= 0 && wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error())) return true; #endif /* WITH_WSREP */ diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index 64fe4ce52fe..29fc4e3362e 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -195,7 +195,8 @@ int Wsrep_high_priority_service::start_transaction( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { DBUG_ENTER(" Wsrep_high_priority_service::start_transaction"); - DBUG_RETURN(m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta)); + DBUG_RETURN(m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta) || + trans_begin(m_thd)); } const wsrep::transaction& Wsrep_high_priority_service::transaction() const @@ -204,14 +205,23 @@ const wsrep::transaction& Wsrep_high_priority_service::transaction() const DBUG_RETURN(m_thd->wsrep_trx()); } -void Wsrep_high_priority_service::adopt_transaction(const wsrep::transaction& transaction) +int Wsrep_high_priority_service::adopt_transaction( + const wsrep::transaction& transaction) { DBUG_ENTER(" Wsrep_high_priority_service::adopt_transaction"); + /* Adopt transaction first to set up transaction meta data for + trans begin. If trans_begin() fails for some reason, roll back + the wsrep transaction before return. */ m_thd->wsrep_cs().adopt_transaction(transaction); - DBUG_VOID_RETURN; + int ret= trans_begin(m_thd); + if (ret) + { + m_thd->wsrep_cs().before_rollback(); + m_thd->wsrep_cs().after_rollback(); + } + DBUG_RETURN(ret); } - int Wsrep_high_priority_service::append_fragment_and_commit( const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta, @@ -254,23 +264,8 @@ int Wsrep_high_priority_service::append_fragment_and_commit( ws_meta, true); } - if (!ret) - { - DBUG_ASSERT(wsrep_thd_trx_seqno(m_thd) > 0); - if (!do_binlog_commit) - { - ret= wsrep_before_commit(m_thd, true); - } - ret= ret || trans_commit(m_thd); - if (!do_binlog_commit) - { - if (opt_log_slave_updates) - { - ret= ret || wsrep_ordered_commit(m_thd, true, wsrep_apply_error()); - } - ret= ret || wsrep_after_commit(m_thd, true); - } - } + ret= ret || trans_commit(m_thd); + m_thd->wsrep_cs().after_applying(); m_thd->mdl_context.release_transactional_locks(); @@ -298,47 +293,39 @@ int Wsrep_high_priority_service::commit(const wsrep::ws_handle& ws_handle, thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true); thd_proc_info(thd, "committing"); - int ret= 0; const bool is_ordered= !ws_meta.seqno().is_undefined(); - /* If opt_log_slave_updates is not on, applier does not write - anything to binlog cache and neither wsrep_before_commit() - nor wsrep_after_commit() we be reached from binlog code - path for applier. Therefore run wsrep_before_commit() - and wsrep_after_commit() here. wsrep_ordered_commit() - will be called from wsrep_ordered_commit_if_no_binlog(). */ - if (!opt_log_slave_updates && !opt_bin_log && is_ordered) - { - if (m_thd->transaction.all.no_2pc == false) - { - ret= wsrep_before_prepare(thd, true); - ret= ret || wsrep_after_prepare(thd, true); - } - ret= ret || wsrep_before_commit(thd, true); - } - ret= ret || trans_commit(thd); + int ret= trans_commit(thd); if (ret == 0) { m_rgi->cleanup_context(thd, 0); } - if (ret == 0 && !opt_log_slave_updates && !opt_bin_log && is_ordered) - { - ret= wsrep_after_commit(thd, true); - } - m_thd->mdl_context.release_transactional_locks(); thd_proc_info(thd, "wsrep applier committed"); if (!is_ordered) { - /* Wsrep commit was not ordered so it does not go through commit time - hooks and remains active. Roll it back to make cleanup happen - in after_applying() call. */ m_thd->wsrep_cs().before_rollback(); m_thd->wsrep_cs().after_rollback(); } + else if (m_thd->wsrep_trx().state() == wsrep::transaction::s_executing) + { + /* + Wsrep commit was ordered but it did not go through commit time + hooks and remains active. Cycle through commit hooks to release + commit order and to make cleanup happen in after_applying() call. + + This is a workaround for CTAS with empty result set. + */ + WSREP_DEBUG("Commit not finished for applier %llu", thd->thread_id); + ret= ret || m_thd->wsrep_cs().before_commit() || + m_thd->wsrep_cs().ordered_commit() || + m_thd->wsrep_cs().after_commit(); + } + + thd->lex->sql_command= SQLCOM_END; must_exit_= check_exit_status(); DBUG_RETURN(ret); @@ -380,6 +367,8 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, trans_commit(thd); thd->close_temporary_tables(); + thd->lex->sql_command= SQLCOM_END; + wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); must_exit_= check_exit_status(); diff --git a/sql/wsrep_high_priority_service.h b/sql/wsrep_high_priority_service.h index 4012ca60a3e..c483aa82d62 100644 --- a/sql/wsrep_high_priority_service.h +++ b/sql/wsrep_high_priority_service.h @@ -36,7 +36,7 @@ public: int start_transaction(const wsrep::ws_handle&, const wsrep::ws_meta&); const wsrep::transaction& transaction() const; - void adopt_transaction(const wsrep::transaction&); + int adopt_transaction(const wsrep::transaction&); int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) = 0; int append_fragment_and_commit(const wsrep::ws_handle&, const wsrep::ws_meta&, diff --git a/sql/wsrep_storage_service.cc b/sql/wsrep_storage_service.cc index 5a15f22ab57..e164114b733 100644 --- a/sql/wsrep_storage_service.cc +++ b/sql/wsrep_storage_service.cc @@ -147,39 +147,14 @@ int Wsrep_storage_service::commit(const wsrep::ws_handle& ws_handle, WSREP_DEBUG("Storage service commit: %llu, %lld", ws_meta.transaction_id().get(), ws_meta.seqno().get()); int ret= 0; - const bool do_binlog_commit= (opt_log_slave_updates && wsrep_gtid_mode); const bool is_ordered= !ws_meta.seqno().is_undefined(); - /* - Write skip event into binlog if gtid_mode is on. This is to - maintain gtid continuity. - */ - if (do_binlog_commit && is_ordered) - { - ret= wsrep_write_skip_event(m_thd); - } - if (!ret && is_ordered) + if (is_ordered) { - ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle, - ws_meta, true); + ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true); } - if (!ret) - { - if (!do_binlog_commit && is_ordered) - { - ret= wsrep_before_commit(m_thd, true); - } - ret= ret || trans_commit(m_thd); - if (!do_binlog_commit && is_ordered) - { - if (opt_log_slave_updates) - { - ret= ret || wsrep_ordered_commit(m_thd, true, wsrep_apply_error()); - } - ret= ret || wsrep_after_commit(m_thd, true); - } - } + ret= ret || trans_commit(m_thd); if (!is_ordered) { diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 04a1224701e..46b9cf75976 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -1,4 +1,4 @@ -/* Copyright 2016 Codership Oy +/* Copyright 2016-2019 Codership Oy This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -32,7 +32,39 @@ class THD; static inline bool wsrep_is_active(THD* thd) { return (thd->wsrep_cs().state() != wsrep::client_state::s_none && - thd->wsrep_cs().transaction().active()); + thd->wsrep_cs().transaction().active()); +} + +/* + Return true if transaction is ordered. + */ +static inline bool wsrep_is_ordered(THD* thd) +{ + return thd->wsrep_trx().ordered(); +} + +/* + Return true if transaction has been BF aborted but has not been + rolled back yet. + + It is required that the caller holds thd->LOCK_thd_data. +*/ +static inline bool wsrep_must_abort(THD* thd) +{ + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort); +} + +/* + Return true if transaction has not been committed. + + Note that we don't require thd->LOCK_thd_data here. Calling this method + makes sense only from codepaths which are past ordered_commit state + and the wsrep transaction is immune to BF aborts at that point. +*/ +static inline bool wsrep_not_committed(THD* thd) +{ + return (thd->wsrep_trx().state() != wsrep::transaction::s_committed); } /* @@ -47,7 +79,7 @@ static inline bool wsrep_is_real(THD* thd, bool all) /* Check if a transaction has generated changes. */ -static inline bool wsrep_has_changes(THD* thd, my_bool all) +static inline bool wsrep_has_changes(THD* thd) { return (thd->wsrep_trx().is_empty() == false); } @@ -137,11 +169,42 @@ static inline int wsrep_after_row(THD* thd, bool) /* Helper method to determine whether commit time hooks should be run for the transaction. + + Commit hooks must be run in the following cases: + - The transaction is local and has generated write set and is committing. + - The transaction has been BF aborted + - Is running in high priority mode and is ordered. This can be replayer, + applier or storage access. */ static inline bool wsrep_run_commit_hook(THD* thd, bool all) { - return (wsrep_is_real(thd, all) && wsrep_is_active(thd) && - (wsrep_thd_is_applying(thd) || wsrep_has_changes(thd, all))); + DBUG_ENTER("wsrep_run_commit_hook"); + DBUG_PRINT("wsrep", ("Is_active: %d is_real %d has_changes %d is_applying %d " + "is_ordered: %d", + wsrep_is_active(thd), wsrep_is_real(thd, all), + wsrep_has_changes(thd), wsrep_thd_is_applying(thd), + wsrep_is_ordered(thd))); + /* Is MST commit or autocommit? */ + bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all); + if (ret && !(wsrep_has_changes(thd) || /* Has generated write set */ + /* Is high priority (replay, applier, storage) and the + transaction is scheduled for commit ordering */ + (wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd)))) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + DBUG_PRINT("wsrep", ("state: %s", + wsrep::to_c_string(thd->wsrep_trx().state()))); + /* Transaction is local but has no changes, the commit hooks will + be skipped and the wsrep transaction is terminated in + wsrep_commit_empty() */ + if (thd->wsrep_trx().state() == wsrep::transaction::s_executing) + { + ret= false; + } + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + DBUG_PRINT("wsrep", ("return: %d", ret)); + DBUG_RETURN(ret); } /* @@ -154,14 +217,12 @@ static inline int wsrep_before_prepare(THD* thd, bool all) DBUG_ENTER("wsrep_before_prepare"); WSREP_DEBUG("wsrep_before_prepare: %d", wsrep_is_real(thd, all)); int ret= 0; - if (wsrep_run_commit_hook(thd, all)) + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + if ((ret= thd->wsrep_cs().before_prepare()) == 0) { - if ((ret= thd->wsrep_cs().before_prepare()) == 0) - { - DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); - wsrep_xid_init(&thd->wsrep_xid, + DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); + wsrep_xid_init(&thd->wsrep_xid, thd->wsrep_trx().ws_meta().gtid()); - } } DBUG_RETURN(ret); } @@ -175,8 +236,8 @@ static inline int wsrep_after_prepare(THD* thd, bool all) { DBUG_ENTER("wsrep_after_prepare"); WSREP_DEBUG("wsrep_after_prepare: %d", wsrep_is_real(thd, all)); - int ret= (wsrep_run_commit_hook(thd, all) ? - thd->wsrep_cs().after_prepare() : 0); + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + int ret= thd->wsrep_cs().after_prepare(); DBUG_ASSERT(ret == 0 || thd->wsrep_cs().current_error() || thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay); DBUG_RETURN(ret); @@ -198,14 +259,12 @@ static inline int wsrep_before_commit(THD* thd, bool all) wsrep_is_real(thd, all), (long long)wsrep_thd_trx_seqno(thd)); int ret= 0; - if (wsrep_run_commit_hook(thd, all)) + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + if ((ret= thd->wsrep_cs().before_commit()) == 0) { - if ((ret= thd->wsrep_cs().before_commit()) == 0) - { - DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); - wsrep_xid_init(&thd->wsrep_xid, - thd->wsrep_trx().ws_meta().gtid()); - } + DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); + wsrep_xid_init(&thd->wsrep_xid, + thd->wsrep_trx().ws_meta().gtid()); } DBUG_RETURN(ret); } @@ -228,8 +287,8 @@ static inline int wsrep_ordered_commit(THD* thd, { DBUG_ENTER("wsrep_ordered_commit"); WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all)); - DBUG_RETURN(wsrep_run_commit_hook(thd, all) ? - thd->wsrep_cs().ordered_commit() : 0); + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + DBUG_RETURN(thd->wsrep_cs().ordered_commit()); } /* @@ -244,14 +303,12 @@ static inline int wsrep_after_commit(THD* thd, bool all) wsrep_is_real(thd, all), wsrep_is_active(thd), (long long)wsrep_thd_trx_seqno(thd), - wsrep_has_changes(thd, all)); - if (wsrep_run_commit_hook(thd, all)) - { - DBUG_RETURN((wsrep_ordered_commit_if_no_binlog(thd, all) || - (thd->wsrep_xid.null(), - thd->wsrep_cs().after_commit()))); - } - DBUG_RETURN(0); + wsrep_has_changes(thd)); + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + DBUG_RETURN((thd->wsrep_trx().state() == wsrep::transaction::s_committing + ? thd->wsrep_cs().ordered_commit() : 0) || + (thd->wsrep_xid.null(), + thd->wsrep_cs().after_commit())); } /* @@ -415,11 +472,25 @@ static inline void wsrep_commit_empty(THD* thd, bool all) thd->wsrep_trx().active() && thd->wsrep_trx().state() != wsrep::transaction::s_committed) { + /* @todo CTAS with STATEMENT binlog format and empty result set + seems to be committing empty. Figure out why and try to fix + elsewhere. */ + DBUG_ASSERT(!wsrep_has_changes(thd) || + (thd->lex->sql_command == SQLCOM_CREATE_TABLE && + !thd->is_current_stmt_binlog_format_row())); bool have_error= wsrep_current_error(thd); int ret= wsrep_before_rollback(thd, all) || wsrep_after_rollback(thd, all) || wsrep_after_statement(thd); - DBUG_ASSERT(have_error || !wsrep_current_error(thd)); + /* The committing transaction was empty but it held some locks and + got BF aborted. As there were no certified changes in the + data, we ignore the deadlock error and rely on error reporting + by storage engine/server. */ + if (!ret && !have_error && wsrep_current_error(thd)) + { + DBUG_ASSERT(wsrep_current_error(thd) == wsrep::e_deadlock_error); + thd->wsrep_cs().reset_error(); + } if (ret) { WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd)); -- cgit v1.2.1