summaryrefslogtreecommitdiff
path: root/sql/handler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/handler.cc')
-rw-r--r--sql/handler.cc339
1 files changed, 250 insertions, 89 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 001055cd475..1b5aaebe3cf 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -54,8 +54,12 @@
#include "semisync_master.h"
#include "wsrep_mysqld.h"
-#include "wsrep.h"
+#ifdef WITH_WSREP
+#include "wsrep_binlog.h"
#include "wsrep_xid.h"
+#include "wsrep_thd.h"
+#include "wsrep_trans_observer.h" /* wsrep transaction hooks */
+#endif /* WITH_WSREP */
/*
While we have legacy_db_type, we have this array to
@@ -251,6 +255,9 @@ handlerton *ha_checktype(THD *thd, handlerton *hton, bool no_substitute)
if (no_substitute)
return NULL;
+#ifdef WITH_WSREP
+ (void)wsrep_after_rollback(thd, false);
+#endif /* WITH_WSREP */
return ha_default_handlerton(thd);
} /* ha_checktype */
@@ -1199,17 +1206,28 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg)
static int prepare_or_error(handlerton *ht, THD *thd, bool all)
{
+ #ifdef WITH_WSREP
+ if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION &&
+ wsrep_before_prepare(thd, all))
+ {
+ return(1);
+ }
+#endif /* WITH_WSREP */
+
int err= ht->prepare(ht, thd, all);
status_var_increment(thd->status_var.ha_prepare_count);
if (err)
{
- /* avoid sending error, if we're going to replay the transaction */
-#ifdef WITH_WSREP
- if (ht != wsrep_hton ||
- err == EMSGSIZE || thd->wsrep_conflict_state != MUST_REPLAY)
-#endif
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
}
+#ifdef WITH_WSREP
+ if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION &&
+ wsrep_after_prepare(thd, all))
+ {
+ err= 1;
+ }
+#endif /* WITH_WSREP */
+
return err;
}
@@ -1394,7 +1412,7 @@ int ha_commit_trans(THD *thd, bool all)
}
#ifdef WITH_ARIA_STORAGE_ENGINE
- ha_maria::implicit_commit(thd, TRUE);
+ ha_maria::implicit_commit(thd, TRUE);
#endif
if (!ha_info)
@@ -1404,6 +1422,12 @@ int ha_commit_trans(THD *thd, bool all)
*/
if (is_real_trans)
thd->transaction.cleanup();
+#ifdef WITH_WSREP
+ if (WSREP(thd) && all && !error)
+ {
+ wsrep_commit_empty(thd, all);
+ }
+#endif /* WITH_WSREP */
DBUG_RETURN(0);
}
@@ -1489,7 +1513,28 @@ int ha_commit_trans(THD *thd, bool all)
if (trans->no_2pc || (rw_ha_count <= 1))
{
+#ifdef WITH_WSREP
+ /*
+ This commit will not go through log_and_order() where wsrep commit
+ ordering is normally done. Commit ordering must be done here.
+ */
+ bool run_wsrep_commit= (WSREP(thd) &&
+ rw_ha_count &&
+ wsrep_thd_is_local(thd) &&
+ wsrep_has_changes(thd, all));
+ if (run_wsrep_commit)
+ error= wsrep_before_commit(thd, all);
+ if (error)
+ {
+ ha_rollback_trans(thd, FALSE);
+ goto wsrep_err;
+ }
+#endif /* WITH_WSREP */
error= ha_commit_one_phase(thd, all);
+#ifdef WITH_WSREP
+ if (run_wsrep_commit)
+ error= wsrep_after_commit(thd, all);
+#endif /* WITH_WSREP */
goto done;
}
@@ -1521,10 +1566,14 @@ int ha_commit_trans(THD *thd, bool all)
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
#ifdef WITH_WSREP
- if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid))
+ if (!error && WSREP_ON)
{
- // xid was rewritten by wsrep
- xid= wsrep_xid_seqno(thd->transaction.xid_state.xid);
+ wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid);
+ if (!s.is_undefined())
+ {
+ // xid was rewritten by wsrep
+ xid= s.get();
+ }
}
#endif /* WITH_WSREP */
@@ -1533,18 +1582,35 @@ int ha_commit_trans(THD *thd, bool all)
error= commit_one_phase_2(thd, all, trans, is_real_trans);
goto done;
}
-
+#ifdef WITH_WSREP
+ if (wsrep_before_commit(thd, all))
+ goto wsrep_err;
+#endif /* WITH_WSREP */
DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
need_commit_ordered);
if (!cookie)
+ {
+ WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
goto err;
-
+ }
DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order");
DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
-
+#ifdef WITH_WSREP
+ if (error || wsrep_after_commit(thd, all))
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
+ {
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ (void)tc_log->unlog(cookie, xid);
+ goto wsrep_err;
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
+#endif /* WITH_WSREP */
DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
if (tc_log->unlog(cookie, xid))
{
@@ -1566,6 +1632,19 @@ done:
goto end;
/* Come here if error and we need to rollback. */
+#ifdef WITH_WSREP
+wsrep_err:
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
+ {
+ WSREP_DEBUG("BF abort has happened after prepare & certify");
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ ha_rollback_trans(thd, TRUE);
+ }
+ else
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+
+#endif /* WITH_WSREP */
err:
error= 1; /* Transaction was rolled back */
/*
@@ -1575,7 +1654,11 @@ err:
*/
if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec))
ha_rollback_trans(thd, all);
-
+ else
+ {
+ WSREP_DEBUG("rollback skipped %p %d",thd->rgi_slave,
+ thd->rgi_slave->is_parallel_exec);
+ }
end:
if (rw_trans && mdl_request.ticket)
{
@@ -1587,6 +1670,13 @@ end:
*/
thd->mdl_context.release_lock(mdl_request.ticket);
}
+#ifdef WITH_WSREP
+ if (WSREP(thd) && all && !error && (rw_ha_count == 0))
+ {
+ wsrep_commit_empty(thd, all);
+ }
+#endif /* WITH_WSREP */
+
DBUG_RETURN(error);
}
@@ -1744,6 +1834,9 @@ int ha_rollback_trans(THD *thd, bool all)
DBUG_RETURN(1);
}
+#ifdef WITH_WSREP
+ (void) wsrep_before_rollback(thd, all);
+#endif /* WITH_WSREP */
if (ha_info)
{
/* Close all cursors that can not survive ROLLBACK */
@@ -1759,9 +1852,9 @@ int ha_rollback_trans(THD *thd, bool all)
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
#ifdef WITH_WSREP
- WSREP_WARN("handlerton rollback failed, thd %llu %lld conf %d SQL %s",
- thd->thread_id, thd->query_id, thd->wsrep_conflict_state,
- thd->query());
+ WSREP_WARN("handlerton rollback failed, thd %lld %lld conf %d SQL %s",
+ thd->thread_id, thd->query_id, thd->wsrep_trx().state(),
+ thd->query());
#endif /* WITH_WSREP */
}
status_var_increment(thd->status_var.ha_rollback_count);
@@ -1780,6 +1873,15 @@ int ha_rollback_trans(THD *thd, bool all)
thd->transaction.xid_state.xa_state != XA_NOTR)
thd->transaction.xid_state.rm_error= thd->get_stmt_da()->sql_errno();
+#ifdef WITH_WSREP
+ if (thd->is_error())
+ {
+ WSREP_DEBUG("ha_rollback_trans(%lld, %s) rolled back: %s: %s; is_real %d",
+ thd->thread_id, all?"TRUE":"FALSE", WSREP_QUERY(thd),
+ thd->get_stmt_da()->message(), is_real_trans);
+ }
+ (void) wsrep_after_rollback(thd, all);
+#endif /* WITH_WSREP */
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
{
@@ -1913,6 +2015,28 @@ static char* xid_to_str(char *buf, XID *xid)
}
#endif
+#ifdef WITH_WSREP
+static my_xid wsrep_order_and_check_continuity(XID *list, int len)
+{
+ wsrep_sort_xid_array(list, len);
+ wsrep::gtid cur_position= wsrep_get_SE_checkpoint();
+ long long cur_seqno= cur_position.seqno().get();
+ for (int i= 0; i < len; ++i)
+ {
+ if (!wsrep_is_wsrep_xid(list + i) ||
+ wsrep_xid_seqno(list + i) != cur_seqno + 1)
+ {
+ WSREP_WARN("Discovered discontinuity in recovered wsrep "
+ "transaction XIDs. Truncating the recovery list to "
+ "%d entries", i);
+ break;
+ }
+ ++cur_seqno;
+ }
+ WSREP_INFO("Last wsrep seqno to be recovered %lld", cur_seqno);
+ return (cur_seqno < 0 ? 0 : cur_seqno);
+}
+#endif /* WITH_WSREP */
/**
recover() step of xa.
@@ -1950,10 +2074,32 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
{
sql_print_information("Found %d prepared transaction(s) in %s",
got, hton_name(hton)->str);
+#ifdef WITH_WSREP
+ /* If wsrep_on=ON, XIDs are first ordered and then the range of
+ recovered XIDs is checked for continuity. All the XIDs which
+ are in continuous range can be safely committed if binlog
+ is off since they have already ordered and certified in the
+ cluster.
+
+ The discontinuity of wsrep XIDs may happen because the GTID
+ is assigned for transaction in wsrep_before_prepare(), but the
+ commit order is entered in wsrep_before_commit(). This means that
+ transactions may run prepare step out of order and may
+ result in gap in wsrep XIDs. This can be the case for example
+ if we have T1 with seqno 1 and T2 with seqno 2 and the server
+ crashes after T2 finishes prepare step but before T1 starts
+ the prepare.
+ */
+ my_xid wsrep_limit= 0;
+ if (WSREP_ON)
+ {
+ wsrep_limit= wsrep_order_and_check_continuity(info->list, got);
+ }
+#endif /* WITH_WSREP */
for (int i=0; i < got; i ++)
{
my_xid x= IF_WSREP(WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ?
- wsrep_xid_seqno(info->list[i]) :
+ wsrep_xid_seqno(&info->list[i]) :
info->list[i].get_my_xid(),
info->list[i].get_my_xid());
if (!x) // not "mine" - that is generated by external TM
@@ -1972,9 +2118,12 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
continue;
}
// recovery mode
- if (info->commit_list ?
- my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 :
- tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
+ if (IF_WSREP((wsrep_emulate_bin_log &&
+ wsrep_is_wsrep_xid(info->list + i) &&
+ x <= wsrep_limit), false) ||
+ (info->commit_list ?
+ my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 :
+ tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT))
{
#ifndef DBUG_OFF
int rc=
@@ -2332,11 +2481,26 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
{
int err;
handlerton *ht= ha_info->ht();
+#ifdef WITH_WSREP
+ if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION)
+ {
+ WSREP_DEBUG("ha_rollback_to_savepoint: run before_rollbackha_rollback_trans hook");
+ (void) wsrep_before_rollback(thd, !thd->in_sub_stmt);
+
+ }
+#endif // WITH_WSREP
if ((err= ht->rollback(ht, thd, !thd->in_sub_stmt)))
{ // cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
}
+#ifdef WITH_WSREP
+ if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION)
+ {
+ WSREP_DEBUG("ha_rollback_to_savepoint: run after_rollback hook");
+ (void) wsrep_after_rollback(thd, !thd->in_sub_stmt);
+ }
+#endif // WITH_WSREP
status_var_increment(thd->status_var.ha_rollback_count);
ha_info_next= ha_info->next();
ha_info->reset(); /* keep it conveniently zero-filled */
@@ -2353,6 +2517,16 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
*/
int ha_savepoint(THD *thd, SAVEPOINT *sv)
{
+#ifdef WITH_WSREP
+ /*
+ Register binlog hton for savepoint processing if wsrep binlog
+ emulation is on.
+ */
+ if (WSREP_EMULATE_BINLOG(thd) && wsrep_thd_is_local(thd))
+ {
+ wsrep_register_binlog_handler(thd, thd->in_multi_stmt_transaction_mode());
+ }
+#endif /* WITH_WSREP */
int error=0;
THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
&thd->transaction.all);
@@ -5968,6 +6142,12 @@ bool handler::check_table_binlog_row_based(bool binlog_row)
return false;
if (unlikely((table->in_use->variables.sql_log_bin_off)))
return 0; /* Called by partitioning engine */
+#ifdef WITH_WSREP
+ if (!table->in_use->variables.sql_log_bin &&
+ wsrep_thd_is_applying(table->in_use))
+ return 0; /* wsrep patch sets sql_log_bin to silence binlogging
+ from high priority threads */
+#endif /* WITH_WSREP */
if (unlikely((!check_table_binlog_row_based_done)))
{
check_table_binlog_row_based_done= 1;
@@ -5998,12 +6178,12 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row)
Otherwise, return 'true' if binary logging is on.
*/
IF_WSREP(((WSREP_EMULATE_BINLOG(thd) &&
- (thd->wsrep_exec_mode != REPL_RECV)) ||
+ wsrep_thd_is_local(thd)) ||
((WSREP(thd) ||
(thd->variables.option_bits & OPTION_BIN_LOG)) &&
mysql_bin_log.is_open())),
- (thd->variables.option_bits & OPTION_BIN_LOG) &&
- mysql_bin_log.is_open()));
+ (thd->variables.option_bits & OPTION_BIN_LOG) &&
+ mysql_bin_log.is_open()));
}
@@ -6128,23 +6308,9 @@ int binlog_log_row(TABLE* table, const uchar *before_record,
/* only InnoDB tables will be replicated through binlog emulation */
if ((WSREP_EMULATE_BINLOG(thd) &&
- table->file->partition_ht()->db_type != DB_TYPE_INNODB) ||
- (thd->wsrep_ignore_table == true))
+ !(table->file->partition_ht()->flags & HTON_WSREP_REPLICATION)) ||
+ thd->wsrep_ignore_table == true)
return 0;
-
- /* enforce wsrep_max_ws_rows */
- if (WSREP(thd) && table->s->tmp_table == NO_TMP_TABLE)
- {
- thd->wsrep_affected_rows++;
- if (wsrep_max_ws_rows &&
- thd->wsrep_exec_mode != REPL_RECV &&
- thd->wsrep_affected_rows > wsrep_max_ws_rows)
- {
- trans_rollback_stmt(thd) || trans_rollback(thd);
- my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
- return ER_ERROR_DURING_COMMIT;
- }
- }
#endif
if (!table->file->check_table_binlog_row_based(1))
@@ -6256,6 +6422,27 @@ int handler::ha_reset()
DBUG_RETURN(reset());
}
+#ifdef WITH_WSREP
+static int wsrep_after_row(THD *thd)
+{
+ DBUG_ENTER("wsrep_after_row");
+ /* enforce wsrep_max_ws_rows */
+ thd->wsrep_affected_rows++;
+ if (wsrep_max_ws_rows &&
+ wsrep_thd_is_local(thd) &&
+ thd->wsrep_affected_rows > wsrep_max_ws_rows)
+ {
+ trans_rollback_stmt(thd) || trans_rollback(thd);
+ my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
+ DBUG_RETURN(ER_ERROR_DURING_COMMIT);
+ }
+ else if (wsrep_after_row(thd, false))
+ {
+ DBUG_RETURN(ER_LOCK_DEADLOCK);
+ }
+ DBUG_RETURN(0);
+}
+#endif /* WITH_WSREP */
int handler::ha_write_row(uchar *buf)
{
@@ -6278,7 +6465,15 @@ int handler::ha_write_row(uchar *buf)
{
rows_changed++;
error= binlog_log_row(table, 0, buf, log_func);
+#ifdef WITH_WSREP
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ {
+ DBUG_RETURN(error);
+ }
+#endif /* WITH_WSREP */
}
+
DEBUG_SYNC_C("ha_write_row_end");
DBUG_RETURN(error);
}
@@ -6310,6 +6505,13 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
{
rows_changed++;
error= binlog_log_row(table, old_data, new_data, log_func);
+#ifdef WITH_WSREP
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ {
+ return error;
+ }
+#endif /* WITH_WSREP */
}
return error;
}
@@ -6365,6 +6567,13 @@ int handler::ha_delete_row(const uchar *buf)
{
rows_changed++;
error= binlog_log_row(table, buf, 0, log_func);
+#ifdef WITH_WSREP
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(ha_thd()) && (error= wsrep_after_row(ha_thd())))
+ {
+ return error;
+ }
+#endif /* WITH_WSREP */
}
return error;
}
@@ -6554,7 +6763,7 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
DBUG_ENTER("ha_abort_transaction");
if (!WSREP(bf_thd) &&
!(bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU &&
- bf_thd->wsrep_exec_mode == TOTAL_ORDER)) {
+ wsrep_thd_is_toi(bf_thd))) {
DBUG_RETURN(0);
}
@@ -6570,54 +6779,6 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
DBUG_RETURN(0);
}
-
-void ha_fake_trx_id(THD *thd)
-{
- DBUG_ENTER("ha_fake_trx_id");
-
- bool no_fake_trx_id= true;
-
- if (!WSREP(thd))
- {
- DBUG_VOID_RETURN;
- }
-
- if (thd->wsrep_ws_handle.trx_id != WSREP_UNDEFINED_TRX_ID)
- {
- WSREP_DEBUG("fake trx id skipped: %" PRIu64, thd->wsrep_ws_handle.trx_id);
- DBUG_VOID_RETURN;
- }
-
- /* Try statement transaction if standard one is not set. */
- THD_TRANS *trans= (thd->transaction.all.ha_list) ? &thd->transaction.all :
- &thd->transaction.stmt;
-
- Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
-
- for (; ha_info; ha_info= ha_info_next)
- {
- handlerton *hton= ha_info->ht();
- if (hton->fake_trx_id)
- {
- hton->fake_trx_id(hton, thd);
-
- /* Got a fake trx id. */
- no_fake_trx_id= false;
-
- /*
- We need transaction ID from just one storage engine providing
- fake_trx_id (which will most likely be the case).
- */
- break;
- }
- ha_info_next= ha_info->next();
- }
-
- if (unlikely(no_fake_trx_id))
- WSREP_WARN("Cannot get fake transaction ID from storage engine.");
-
- DBUG_VOID_RETURN;
-}
#endif /* WITH_WSREP */