summaryrefslogtreecommitdiff
path: root/sql/handler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/handler.cc')
-rw-r--r--sql/handler.cc216
1 files changed, 155 insertions, 61 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index b3481c7e429..cb3f156be81 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -55,6 +55,12 @@
#include "wsrep_mysqld.h"
#include "wsrep.h"
#include "wsrep_xid.h"
+#ifdef WITH_WSREP
+#include "wsrep_sr.h"
+#endif
+#include "wsrep_thd.h"
+#include "wsrep_trans_observer.h" /* wsrep transaction hooks */
+#include "log.h"
/*
While we have legacy_db_type, we have this array to
@@ -1168,8 +1174,8 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all)
{
/* avoid sending error, if we're going to replay the transaction */
#ifdef WITH_WSREP
- if (ht != wsrep_hton ||
- err == EMSGSIZE || thd->wsrep_conflict_state != MUST_REPLAY)
+ if (ht->db_type != DB_TYPE_UNKNOWN ||
+ err == EMSGSIZE || thd->wsrep_conflict_state() != MUST_REPLAY)
#endif
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
}
@@ -1248,6 +1254,18 @@ ha_check_and_coalesce_trx_read_only(THD *thd, Ha_trx_info *ha_list,
for (ha_info= ha_list; ha_info; ha_info= ha_info->next())
{
+#ifdef WITH_WSREP
+ /*
+ Thd has wsrep_schema.SR open and may operate it
+ during prepare phase, set InnoDB ha_info read_write.
+ */
+ if (WSREP_CLIENT(thd) && thd->wsrep_is_streaming() &&
+ wsrep_SR_store && wsrep_SR_store_type == WSREP_SR_STORE_TABLE &&
+ ha_info->ht()->db_type == DB_TYPE_INNODB)
+ {
+ ha_info->set_trx_read_write();
+ }
+#endif /* WITH_WSREP */
if (ha_info->is_trx_read_write())
++rw_ha_count;
@@ -1460,6 +1478,13 @@ int ha_commit_trans(THD *thd, bool all)
need_prepare_ordered= FALSE;
need_commit_ordered= FALSE;
xid= thd->transaction.xid_state.xid.get_my_xid();
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_before_prepare(thd, all))
+ {
+ wsrep_override_error(thd, ER_ERROR_DURING_COMMIT);
+ goto wsrep_err;
+ }
+#endif /* WITH_WSREP */
for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
{
@@ -1483,6 +1508,13 @@ int ha_commit_trans(THD *thd, bool all)
}
DEBUG_SYNC(thd, "ha_commit_trans_after_prepare");
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_after_prepare(thd, all))
+ {
+ wsrep_override_error(thd, ER_ERROR_DURING_COMMIT);
+ goto err;
+ }
+#endif /* WITH_WSREP */
#ifdef WITH_WSREP
if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid))
@@ -1502,13 +1534,29 @@ int ha_commit_trans(THD *thd, bool all)
cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
need_commit_ordered);
if (!cookie)
+ {
+ WSREP_DEBUG("log_and_order has failed %lu %d", thd->thread_id, cookie);
goto err;
+ }
DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order");
DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
-
+#ifdef WITH_WSREP
+ if (error)
+ {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_exec_mode == LOCAL_COMMIT &&
+ thd->wsrep_conflict_state() == MUST_ABORT)
+ {
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ (void)tc_log->unlog(cookie, xid);
+ goto wsrep_err;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+}
+#endif /* WITH_WSREP */
DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
if (tc_log->unlog(cookie, xid))
{
@@ -1530,6 +1578,20 @@ done:
goto end;
/* Come here if error and we need to rollback. */
+#ifdef WITH_WSREP
+ wsrep_err:
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_exec_mode == LOCAL_COMMIT &&
+ thd->wsrep_conflict_state() == MUST_ABORT)
+ {
+ WSREP_DEBUG("BF abort has happened after prepare & certify");
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ ha_rollback_trans(thd, TRUE);
+ }
+ else
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+#endif /* WITH_WSREP */
err:
error= 1; /* Transaction was rolled back */
/*
@@ -1539,7 +1601,10 @@ err:
*/
if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec))
ha_rollback_trans(thd, all);
-
+ else
+ {
+ WSREP_DEBUG("WTF, rollback skipped %d %d",thd->rgi_slave, thd->rgi_slave->is_parallel_exec);
+ }
end:
if (rw_trans && mdl_request.ticket)
{
@@ -1673,7 +1738,6 @@ int ha_rollback_trans(THD *thd, bool all)
*/
DBUG_ASSERT(thd->transaction.stmt.ha_list == NULL ||
trans == &thd->transaction.stmt);
-
#ifdef HAVE_REPLICATION
if (is_real_trans)
{
@@ -1708,6 +1772,9 @@ int ha_rollback_trans(THD *thd, bool all)
DBUG_RETURN(1);
}
+#ifdef WITH_WSREP
+ (void) wsrep_before_rollback(thd, all);
+#endif // WITH_WSREP
if (ha_info)
{
/* Close all cursors that can not survive ROLLBACK */
@@ -1723,9 +1790,9 @@ int ha_rollback_trans(THD *thd, bool all)
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
#ifdef WITH_WSREP
- WSREP_WARN("handlerton rollback failed, thd %llu %lld conf %d SQL %s",
- thd->thread_id, thd->query_id, thd->wsrep_conflict_state,
- thd->query());
+ WSREP_WARN("handlerton rollback failed, thd %lld %lld conf %d SQL %s",
+ thd->thread_id, thd->query_id, thd->wsrep_conflict_state(),
+ thd->query());
#endif /* WITH_WSREP */
}
status_var_increment(thd->status_var.ha_rollback_count);
@@ -1744,6 +1811,15 @@ int ha_rollback_trans(THD *thd, bool all)
thd->transaction.xid_state.xa_state != XA_NOTR)
thd->transaction.xid_state.rm_error= thd->get_stmt_da()->sql_errno();
+#ifdef WITH_WSREP
+ if (thd->is_error())
+ {
+ WSREP_DEBUG("ha_rollback_trans(%lld, %s) rolled back: %s: %s; is_real %d",
+ thd->thread_id, all?"TRUE":"FALSE", WSREP_QUERY(thd),
+ thd->get_stmt_da()->message(), is_real_trans);
+ }
+ (void) wsrep_after_rollback(thd, all);
+#endif
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
{
@@ -2296,6 +2372,14 @@ int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
{
int err;
handlerton *ht= ha_info->ht();
+#ifdef WITH_WSREP
+ if (ht->db_type == DB_TYPE_INNODB)
+ {
+ WSREP_DEBUG("ha_rollback_to_savepoint: run before_rollbackha_rollback_trans hook");
+ (void) wsrep_before_rollback(thd, !thd->in_sub_stmt);
+
+ }
+#endif // WITH_WSREP
if ((err= ht->rollback(ht, thd, !thd->in_sub_stmt)))
{ // cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
@@ -6021,6 +6105,16 @@ static int binlog_log_row_internal(TABLE* table,
bool error= 0;
THD *const thd= table->in_use;
+#ifdef WITH_WSREP
+ /* only InnoDB tables will be replicated through binlog emulation */
+ if (WSREP_EMULATE_BINLOG(thd) &&
+ table->file->ht->db_type != DB_TYPE_INNODB &&
+ !(table->file->ht->db_type == DB_TYPE_PARTITION_DB &&
+ (((ha_partition*)(table->file))->wsrep_db_type() == DB_TYPE_INNODB)))
+ {
+ return 0;
+ }
+#endif /* WITH_WSREP */
/*
If there are no table maps written to the binary log, this is
the first row handled in this statement. In that case, we need
@@ -6055,20 +6149,6 @@ int binlog_log_row(TABLE* table, const uchar *before_record,
table->file->partition_ht()->db_type != DB_TYPE_INNODB) ||
(thd->wsrep_ignore_table == true))
return 0;
-
- /* enforce wsrep_max_ws_rows */
- if (WSREP(thd) && table->s->tmp_table == NO_TMP_TABLE)
- {
- thd->wsrep_affected_rows++;
- if (wsrep_max_ws_rows &&
- thd->wsrep_exec_mode != REPL_RECV &&
- thd->wsrep_affected_rows > wsrep_max_ws_rows)
- {
- trans_rollback_stmt(thd) || trans_rollback(thd);
- my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
- return ER_ERROR_DURING_COMMIT;
- }
- }
#endif
if (!table->file->check_table_binlog_row_based(1))
@@ -6180,6 +6260,31 @@ int handler::ha_reset()
DBUG_RETURN(reset());
}
+#ifdef WITH_WSREP
+static int wsrep_after_row(THD *thd)
+{
+ DBUG_ENTER("wsrep_after_row");
+ /* enforce wsrep_max_ws_rows */
+ thd->wsrep_affected_rows++;
+ if (wsrep_max_ws_rows &&
+ thd->wsrep_exec_mode != REPL_RECV &&
+ thd->wsrep_affected_rows > wsrep_max_ws_rows)
+ {
+ trans_rollback_stmt(thd) || trans_rollback(thd);
+ my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
+ DBUG_RETURN(ER_ERROR_DURING_COMMIT);
+ }
+ else if (wsrep_after_row(thd, false))
+ {
+ if (!thd->get_stmt_da()->is_error())
+ {
+ wsrep_override_error(thd, ER_LOCK_DEADLOCK);
+ }
+ DBUG_RETURN(ER_LOCK_DEADLOCK);
+ }
+ DBUG_RETURN(0);
+}
+#endif /* WITH_WSREP */
int handler::ha_write_row(uchar *buf)
{
@@ -6202,7 +6307,16 @@ int handler::ha_write_row(uchar *buf)
{
rows_changed++;
error= binlog_log_row(table, 0, buf, log_func);
+#ifdef WITH_WSREP
+ THD *thd= current_thd;
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(thd) && (error= wsrep_after_row(thd)))
+ {
+ DBUG_RETURN(error);
+ }
+#endif /* WITH_WSREP */
}
+
DEBUG_SYNC_C("ha_write_row_end");
DBUG_RETURN(error);
}
@@ -6234,6 +6348,14 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
{
rows_changed++;
error= binlog_log_row(table, old_data, new_data, log_func);
+#ifdef WITH_WSREP
+ THD *thd= current_thd;
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(thd) && (error= wsrep_after_row(thd)))
+ {
+ return error;
+ }
+#endif /* WITH_WSREP */
}
return error;
}
@@ -6289,6 +6411,14 @@ int handler::ha_delete_row(const uchar *buf)
{
rows_changed++;
error= binlog_log_row(table, buf, 0, log_func);
+#ifdef WITH_WSREP
+ THD *thd= current_thd;
+ if (table_share->tmp_table == NO_TMP_TABLE &&
+ WSREP(thd) && (error= wsrep_after_row(thd)))
+ {
+ return error;
+ }
+#endif /* WITH_WSREP */
}
return error;
}
@@ -6494,51 +6624,15 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
DBUG_RETURN(0);
}
-
void ha_fake_trx_id(THD *thd)
{
- DBUG_ENTER("ha_fake_trx_id");
-
- bool no_fake_trx_id= true;
-
- if (!WSREP(thd))
- {
- DBUG_VOID_RETURN;
- }
-
- if (thd->wsrep_ws_handle.trx_id != WSREP_UNDEFINED_TRX_ID)
+ DBUG_ENTER("ha_wsrep_fake_trx_id");
+ if (!WSREP(thd))
{
- WSREP_DEBUG("fake trx id skipped: %lu", thd->wsrep_ws_handle.trx_id);
DBUG_VOID_RETURN;
}
- /* Try statement transaction if standard one is not set. */
- THD_TRANS *trans= (thd->transaction.all.ha_list) ? &thd->transaction.all :
- &thd->transaction.stmt;
-
- Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
-
- for (; ha_info; ha_info= ha_info_next)
- {
- handlerton *hton= ha_info->ht();
- if (hton->fake_trx_id)
- {
- hton->fake_trx_id(hton, thd);
-
- /* Got a fake trx id. */
- no_fake_trx_id= false;
-
- /*
- We need transaction ID from just one storage engine providing
- fake_trx_id (which will most likely be the case).
- */
- break;
- }
- ha_info_next= ha_info->next();
- }
-
- if (unlikely(no_fake_trx_id))
- WSREP_WARN("Cannot get fake transaction ID from storage engine.");
+ (void *)wsrep_ws_handle_for_trx(&thd->wsrep_ws_handle, thd->query_id);
DBUG_VOID_RETURN;
}