summaryrefslogtreecommitdiff
path: root/sql/wsrep_trans_observer.h
diff options
context:
space:
mode:
authorTeemu Ollakka <teemu.ollakka@galeracluster.com>2019-02-21 21:57:52 +0200
committerJan Lindström <jan.lindstrom@mariadb.com>2019-03-04 09:00:25 +0200
commita8ff4f243da2114a03aabc6513dc62acd308c0e3 (patch)
tree5b5f3e7091cdb8e9effc8da578d5fa8d4ae75404 /sql/wsrep_trans_observer.h
parentfa52846bc7fd989691ed3277260570e86d6d199c (diff)
downloadmariadb-git-a8ff4f243da2114a03aabc6513dc62acd308c0e3.tar.gz
MDEV-18631 Fix streaming replication with wsrep_gtid_mode=ON
With wsrep_gtid_mode=ON, the appropriate commit hooks were not called in all cases for applied streaming transactions. As a fix, removed all special handling of commit order critical section from Wsrep_high_priority_service and Wsrep_storage_service. Now commit order critical section is always entered in ha_commit_trans(). Check for wsrep_run_commit_hook is now done in handler.cc, log.cc. This makes it explicit that the transaction is an active wsrep transaction which must go through commit hooks.
Diffstat (limited to 'sql/wsrep_trans_observer.h')
-rw-r--r--sql/wsrep_trans_observer.h133
1 files changed, 102 insertions, 31 deletions
diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h
index 04a1224701e..46b9cf75976 100644
--- a/sql/wsrep_trans_observer.h
+++ b/sql/wsrep_trans_observer.h
@@ -1,4 +1,4 @@
-/* Copyright 2016 Codership Oy <http://www.codership.com>
+/* Copyright 2016-2019 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -32,7 +32,39 @@ class THD;
static inline bool wsrep_is_active(THD* thd)
{
return (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
- thd->wsrep_cs().transaction().active());
+ thd->wsrep_cs().transaction().active());
+}
+
+/*
+ Return true if transaction is ordered.
+ */
+static inline bool wsrep_is_ordered(THD* thd)
+{
+ return thd->wsrep_trx().ordered();
+}
+
+/*
+ Return true if transaction has been BF aborted but has not been
+ rolled back yet.
+
+ It is required that the caller holds thd->LOCK_thd_data.
+*/
+static inline bool wsrep_must_abort(THD* thd)
+{
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
+ return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort);
+}
+
+/*
+ Return true if transaction has not been committed.
+
+ Note that we don't require thd->LOCK_thd_data here. Calling this method
+ makes sense only from codepaths which are past ordered_commit state
+ and the wsrep transaction is immune to BF aborts at that point.
+*/
+static inline bool wsrep_not_committed(THD* thd)
+{
+ return (thd->wsrep_trx().state() != wsrep::transaction::s_committed);
}
/*
@@ -47,7 +79,7 @@ static inline bool wsrep_is_real(THD* thd, bool all)
/*
Check if a transaction has generated changes.
*/
-static inline bool wsrep_has_changes(THD* thd, my_bool all)
+static inline bool wsrep_has_changes(THD* thd)
{
return (thd->wsrep_trx().is_empty() == false);
}
@@ -137,11 +169,42 @@ static inline int wsrep_after_row(THD* thd, bool)
/*
Helper method to determine whether commit time hooks
should be run for the transaction.
+
+ Commit hooks must be run in the following cases:
+ - The transaction is local and has generated write set and is committing.
+ - The transaction has been BF aborted
+ - Is running in high priority mode and is ordered. This can be replayer,
+ applier or storage access.
*/
static inline bool wsrep_run_commit_hook(THD* thd, bool all)
{
- return (wsrep_is_real(thd, all) && wsrep_is_active(thd) &&
- (wsrep_thd_is_applying(thd) || wsrep_has_changes(thd, all)));
+ DBUG_ENTER("wsrep_run_commit_hook");
+ DBUG_PRINT("wsrep", ("Is_active: %d is_real %d has_changes %d is_applying %d "
+ "is_ordered: %d",
+ wsrep_is_active(thd), wsrep_is_real(thd, all),
+ wsrep_has_changes(thd), wsrep_thd_is_applying(thd),
+ wsrep_is_ordered(thd)));
+ /* Is MST commit or autocommit? */
+ bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all);
+ if (ret && !(wsrep_has_changes(thd) || /* Has generated write set */
+ /* Is high priority (replay, applier, storage) and the
+ transaction is scheduled for commit ordering */
+ (wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd))))
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ DBUG_PRINT("wsrep", ("state: %s",
+ wsrep::to_c_string(thd->wsrep_trx().state())));
+ /* Transaction is local but has no changes, the commit hooks will
+ be skipped and the wsrep transaction is terminated in
+ wsrep_commit_empty() */
+ if (thd->wsrep_trx().state() == wsrep::transaction::s_executing)
+ {
+ ret= false;
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
+ DBUG_PRINT("wsrep", ("return: %d", ret));
+ DBUG_RETURN(ret);
}
/*
@@ -154,14 +217,12 @@ static inline int wsrep_before_prepare(THD* thd, bool all)
DBUG_ENTER("wsrep_before_prepare");
WSREP_DEBUG("wsrep_before_prepare: %d", wsrep_is_real(thd, all));
int ret= 0;
- if (wsrep_run_commit_hook(thd, all))
+ DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
+ if ((ret= thd->wsrep_cs().before_prepare()) == 0)
{
- if ((ret= thd->wsrep_cs().before_prepare()) == 0)
- {
- DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
- wsrep_xid_init(&thd->wsrep_xid,
+ DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
+ wsrep_xid_init(&thd->wsrep_xid,
thd->wsrep_trx().ws_meta().gtid());
- }
}
DBUG_RETURN(ret);
}
@@ -175,8 +236,8 @@ static inline int wsrep_after_prepare(THD* thd, bool all)
{
DBUG_ENTER("wsrep_after_prepare");
WSREP_DEBUG("wsrep_after_prepare: %d", wsrep_is_real(thd, all));
- int ret= (wsrep_run_commit_hook(thd, all) ?
- thd->wsrep_cs().after_prepare() : 0);
+ DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
+ int ret= thd->wsrep_cs().after_prepare();
DBUG_ASSERT(ret == 0 || thd->wsrep_cs().current_error() ||
thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay);
DBUG_RETURN(ret);
@@ -198,14 +259,12 @@ static inline int wsrep_before_commit(THD* thd, bool all)
wsrep_is_real(thd, all),
(long long)wsrep_thd_trx_seqno(thd));
int ret= 0;
- if (wsrep_run_commit_hook(thd, all))
+ DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
+ if ((ret= thd->wsrep_cs().before_commit()) == 0)
{
- if ((ret= thd->wsrep_cs().before_commit()) == 0)
- {
- DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
- wsrep_xid_init(&thd->wsrep_xid,
- thd->wsrep_trx().ws_meta().gtid());
- }
+ DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
+ wsrep_xid_init(&thd->wsrep_xid,
+ thd->wsrep_trx().ws_meta().gtid());
}
DBUG_RETURN(ret);
}
@@ -228,8 +287,8 @@ static inline int wsrep_ordered_commit(THD* thd,
{
DBUG_ENTER("wsrep_ordered_commit");
WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all));
- DBUG_RETURN(wsrep_run_commit_hook(thd, all) ?
- thd->wsrep_cs().ordered_commit() : 0);
+ DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
+ DBUG_RETURN(thd->wsrep_cs().ordered_commit());
}
/*
@@ -244,14 +303,12 @@ static inline int wsrep_after_commit(THD* thd, bool all)
wsrep_is_real(thd, all),
wsrep_is_active(thd),
(long long)wsrep_thd_trx_seqno(thd),
- wsrep_has_changes(thd, all));
- if (wsrep_run_commit_hook(thd, all))
- {
- DBUG_RETURN((wsrep_ordered_commit_if_no_binlog(thd, all) ||
- (thd->wsrep_xid.null(),
- thd->wsrep_cs().after_commit())));
- }
- DBUG_RETURN(0);
+ wsrep_has_changes(thd));
+ DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
+ DBUG_RETURN((thd->wsrep_trx().state() == wsrep::transaction::s_committing
+ ? thd->wsrep_cs().ordered_commit() : 0) ||
+ (thd->wsrep_xid.null(),
+ thd->wsrep_cs().after_commit()));
}
/*
@@ -415,11 +472,25 @@ static inline void wsrep_commit_empty(THD* thd, bool all)
thd->wsrep_trx().active() &&
thd->wsrep_trx().state() != wsrep::transaction::s_committed)
{
+ /* @todo CTAS with STATEMENT binlog format and empty result set
+ seems to be committing empty. Figure out why and try to fix
+ elsewhere. */
+ DBUG_ASSERT(!wsrep_has_changes(thd) ||
+ (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
+ !thd->is_current_stmt_binlog_format_row()));
bool have_error= wsrep_current_error(thd);
int ret= wsrep_before_rollback(thd, all) ||
wsrep_after_rollback(thd, all) ||
wsrep_after_statement(thd);
- DBUG_ASSERT(have_error || !wsrep_current_error(thd));
+ /* The committing transaction was empty but it held some locks and
+ got BF aborted. As there were no certified changes in the
+ data, we ignore the deadlock error and rely on error reporting
+ by storage engine/server. */
+ if (!ret && !have_error && wsrep_current_error(thd))
+ {
+ DBUG_ASSERT(wsrep_current_error(thd) == wsrep::e_deadlock_error);
+ thd->wsrep_cs().reset_error();
+ }
if (ret)
{
WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd));