diff options
author | Teemu Ollakka <teemu.ollakka@galeracluster.com> | 2019-02-21 21:57:52 +0200 |
---|---|---|
committer | Jan Lindström <jan.lindstrom@mariadb.com> | 2019-03-04 09:00:25 +0200 |
commit | a8ff4f243da2114a03aabc6513dc62acd308c0e3 (patch) | |
tree | 5b5f3e7091cdb8e9effc8da578d5fa8d4ae75404 /sql/wsrep_trans_observer.h | |
parent | fa52846bc7fd989691ed3277260570e86d6d199c (diff) | |
download | mariadb-git-a8ff4f243da2114a03aabc6513dc62acd308c0e3.tar.gz |
MDEV-18631 Fix streaming replication with wsrep_gtid_mode=ON
With wsrep_gtid_mode=ON, the appropriate commit hooks were not
called in all cases for applied streaming transactions.
As a fix, removed all special handling of commit order critical
section from Wsrep_high_priority_service and Wsrep_storage_service.
Now commit order critical section is always entered in ha_commit_trans().
Check for wsrep_run_commit_hook is now done in handler.cc, log.cc.
This makes it explicit that the transaction is an active wsrep
transaction which must go through commit hooks.
Diffstat (limited to 'sql/wsrep_trans_observer.h')
-rw-r--r-- | sql/wsrep_trans_observer.h | 133 |
1 files changed, 102 insertions, 31 deletions
diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 04a1224701e..46b9cf75976 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -1,4 +1,4 @@ -/* Copyright 2016 Codership Oy <http://www.codership.com> +/* Copyright 2016-2019 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -32,7 +32,39 @@ class THD; static inline bool wsrep_is_active(THD* thd) { return (thd->wsrep_cs().state() != wsrep::client_state::s_none && - thd->wsrep_cs().transaction().active()); + thd->wsrep_cs().transaction().active()); +} + +/* + Return true if transaction is ordered. + */ +static inline bool wsrep_is_ordered(THD* thd) +{ + return thd->wsrep_trx().ordered(); +} + +/* + Return true if transaction has been BF aborted but has not been + rolled back yet. + + It is required that the caller holds thd->LOCK_thd_data. +*/ +static inline bool wsrep_must_abort(THD* thd) +{ + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort); +} + +/* + Return true if transaction has not been committed. + + Note that we don't require thd->LOCK_thd_data here. Calling this method + makes sense only from codepaths which are past ordered_commit state + and the wsrep transaction is immune to BF aborts at that point. +*/ +static inline bool wsrep_not_committed(THD* thd) +{ + return (thd->wsrep_trx().state() != wsrep::transaction::s_committed); } /* @@ -47,7 +79,7 @@ static inline bool wsrep_is_real(THD* thd, bool all) /* Check if a transaction has generated changes. */ -static inline bool wsrep_has_changes(THD* thd, my_bool all) +static inline bool wsrep_has_changes(THD* thd) { return (thd->wsrep_trx().is_empty() == false); } @@ -137,11 +169,42 @@ static inline int wsrep_after_row(THD* thd, bool) /* Helper method to determine whether commit time hooks should be run for the transaction. + + Commit hooks must be run in the following cases: + - The transaction is local and has generated write set and is committing. + - The transaction has been BF aborted + - Is running in high priority mode and is ordered. This can be replayer, + applier or storage access. */ static inline bool wsrep_run_commit_hook(THD* thd, bool all) { - return (wsrep_is_real(thd, all) && wsrep_is_active(thd) && - (wsrep_thd_is_applying(thd) || wsrep_has_changes(thd, all))); + DBUG_ENTER("wsrep_run_commit_hook"); + DBUG_PRINT("wsrep", ("Is_active: %d is_real %d has_changes %d is_applying %d " + "is_ordered: %d", + wsrep_is_active(thd), wsrep_is_real(thd, all), + wsrep_has_changes(thd), wsrep_thd_is_applying(thd), + wsrep_is_ordered(thd))); + /* Is MST commit or autocommit? */ + bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all); + if (ret && !(wsrep_has_changes(thd) || /* Has generated write set */ + /* Is high priority (replay, applier, storage) and the + transaction is scheduled for commit ordering */ + (wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd)))) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + DBUG_PRINT("wsrep", ("state: %s", + wsrep::to_c_string(thd->wsrep_trx().state()))); + /* Transaction is local but has no changes, the commit hooks will + be skipped and the wsrep transaction is terminated in + wsrep_commit_empty() */ + if (thd->wsrep_trx().state() == wsrep::transaction::s_executing) + { + ret= false; + } + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + DBUG_PRINT("wsrep", ("return: %d", ret)); + DBUG_RETURN(ret); } /* @@ -154,14 +217,12 @@ static inline int wsrep_before_prepare(THD* thd, bool all) DBUG_ENTER("wsrep_before_prepare"); WSREP_DEBUG("wsrep_before_prepare: %d", wsrep_is_real(thd, all)); int ret= 0; - if (wsrep_run_commit_hook(thd, all)) + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + if ((ret= thd->wsrep_cs().before_prepare()) == 0) { - if ((ret= thd->wsrep_cs().before_prepare()) == 0) - { - DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); - wsrep_xid_init(&thd->wsrep_xid, + DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); + wsrep_xid_init(&thd->wsrep_xid, thd->wsrep_trx().ws_meta().gtid()); - } } DBUG_RETURN(ret); } @@ -175,8 +236,8 @@ static inline int wsrep_after_prepare(THD* thd, bool all) { DBUG_ENTER("wsrep_after_prepare"); WSREP_DEBUG("wsrep_after_prepare: %d", wsrep_is_real(thd, all)); - int ret= (wsrep_run_commit_hook(thd, all) ? - thd->wsrep_cs().after_prepare() : 0); + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + int ret= thd->wsrep_cs().after_prepare(); DBUG_ASSERT(ret == 0 || thd->wsrep_cs().current_error() || thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay); DBUG_RETURN(ret); @@ -198,14 +259,12 @@ static inline int wsrep_before_commit(THD* thd, bool all) wsrep_is_real(thd, all), (long long)wsrep_thd_trx_seqno(thd)); int ret= 0; - if (wsrep_run_commit_hook(thd, all)) + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + if ((ret= thd->wsrep_cs().before_commit()) == 0) { - if ((ret= thd->wsrep_cs().before_commit()) == 0) - { - DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); - wsrep_xid_init(&thd->wsrep_xid, - thd->wsrep_trx().ws_meta().gtid()); - } + DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); + wsrep_xid_init(&thd->wsrep_xid, + thd->wsrep_trx().ws_meta().gtid()); } DBUG_RETURN(ret); } @@ -228,8 +287,8 @@ static inline int wsrep_ordered_commit(THD* thd, { DBUG_ENTER("wsrep_ordered_commit"); WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all)); - DBUG_RETURN(wsrep_run_commit_hook(thd, all) ? - thd->wsrep_cs().ordered_commit() : 0); + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + DBUG_RETURN(thd->wsrep_cs().ordered_commit()); } /* @@ -244,14 +303,12 @@ static inline int wsrep_after_commit(THD* thd, bool all) wsrep_is_real(thd, all), wsrep_is_active(thd), (long long)wsrep_thd_trx_seqno(thd), - wsrep_has_changes(thd, all)); - if (wsrep_run_commit_hook(thd, all)) - { - DBUG_RETURN((wsrep_ordered_commit_if_no_binlog(thd, all) || - (thd->wsrep_xid.null(), - thd->wsrep_cs().after_commit()))); - } - DBUG_RETURN(0); + wsrep_has_changes(thd)); + DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); + DBUG_RETURN((thd->wsrep_trx().state() == wsrep::transaction::s_committing + ? thd->wsrep_cs().ordered_commit() : 0) || + (thd->wsrep_xid.null(), + thd->wsrep_cs().after_commit())); } /* @@ -415,11 +472,25 @@ static inline void wsrep_commit_empty(THD* thd, bool all) thd->wsrep_trx().active() && thd->wsrep_trx().state() != wsrep::transaction::s_committed) { + /* @todo CTAS with STATEMENT binlog format and empty result set + seems to be committing empty. Figure out why and try to fix + elsewhere. */ + DBUG_ASSERT(!wsrep_has_changes(thd) || + (thd->lex->sql_command == SQLCOM_CREATE_TABLE && + !thd->is_current_stmt_binlog_format_row())); bool have_error= wsrep_current_error(thd); int ret= wsrep_before_rollback(thd, all) || wsrep_after_rollback(thd, all) || wsrep_after_statement(thd); - DBUG_ASSERT(have_error || !wsrep_current_error(thd)); + /* The committing transaction was empty but it held some locks and + got BF aborted. As there were no certified changes in the + data, we ignore the deadlock error and rely on error reporting + by storage engine/server. */ + if (!ret && !have_error && wsrep_current_error(thd)) + { + DBUG_ASSERT(wsrep_current_error(thd) == wsrep::e_deadlock_error); + thd->wsrep_cs().reset_error(); + } if (ret) { WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd)); |